diff --git a/myems-cleaning/clean_analog_value.py b/myems-cleaning/clean_analog_value.py index 2cba4254..73fe6fda 100644 --- a/myems-cleaning/clean_analog_value.py +++ b/myems-cleaning/clean_analog_value.py @@ -7,30 +7,32 @@ import schedule def job(logger): - cnx = None - cursor = None + cnx_historical = None + cursor_historical = None try: - cnx = mysql.connector.connect(**config.myems_historical_db) - cursor = cnx.cursor() + cnx_historical = mysql.connector.connect(**config.myems_historical_db) + cursor_historical = cnx_historical.cursor() except Exception as e: logger.error("Error in clean analog value process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() return expired_utc = datetime.utcnow() - timedelta(days=config.live_in_days) try: - cursor.execute(" DELETE FROM tbl_analog_value WHERE utc_date_time < %s ", (expired_utc,)) - cnx.commit() + cursor_historical.execute(" DELETE " + " FROM tbl_analog_value " + " WHERE utc_date_time < %s ", (expired_utc,)) + cnx_historical.commit() except Exception as e: logger.error("Error in delete_expired_trend process " + str(e)) finally: - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() logger.info("Deleted trend before date time in UTC: " + expired_utc.isoformat()[0:19]) diff --git a/myems-cleaning/clean_digital_value.py b/myems-cleaning/clean_digital_value.py index a3ebf631..47830009 100644 --- a/myems-cleaning/clean_digital_value.py +++ b/myems-cleaning/clean_digital_value.py @@ -7,30 +7,32 @@ import schedule def job(logger): - cnx = None - cursor = None + cnx_historical = None + cursor_historical = None try: - cnx = mysql.connector.connect(**config.myems_historical_db) - cursor = cnx.cursor() + cnx_historical = mysql.connector.connect(**config.myems_historical_db) + cursor_historical = cnx_historical.cursor() except Exception as e: logger.error("Error in clean digital value process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() return expired_utc = datetime.utcnow() - timedelta(days=config.live_in_days) try: - cursor.execute(" DELETE FROM tbl_digital_value WHERE utc_date_time < %s ", (expired_utc,)) - cnx.commit() + cursor_historical.execute(" DELETE " + " FROM tbl_digital_value " + " WHERE utc_date_time < %s ", (expired_utc,)) + cnx_historical.commit() except Exception as e: logger.error("Error in delete_expired_trend process " + str(e)) finally: - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() logger.info("Deleted trend before date time in UTC: " + expired_utc.isoformat()[0:19]) diff --git a/myems-cleaning/clean_energy_value.py b/myems-cleaning/clean_energy_value.py index 8a358da9..91792e59 100644 --- a/myems-cleaning/clean_energy_value.py +++ b/myems-cleaning/clean_energy_value.py @@ -1,7 +1,7 @@ import mysql.connector import config import time -from datetime import datetime +from datetime import datetime, timedelta ######################################################################################################################## @@ -17,17 +17,17 @@ def process(logger): while True: # the outermost loop to reconnect server if there is a connection error - cnx = None - cursor = None + cnx_historical = None + cursor_historical = None try: - cnx = mysql.connector.connect(**config.myems_historical_db) - cursor = cnx.cursor() + cnx_historical = mysql.connector.connect(**config.myems_historical_db) + cursor_historical = cnx_historical.cursor() except Exception as e: logger.error("Error at the begin of clean_energy_value.process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(60) continue @@ -46,19 +46,20 @@ def process(logger): query = (" SELECT MIN(utc_date_time), MAX(utc_date_time) " " FROM tbl_energy_value " " WHERE is_bad IS NULL ") - cursor.execute(query, ()) - row_datetime = cursor.fetchone() + cursor_historical.execute(query, ()) + row_datetime = cursor_historical.fetchone() if row_datetime is not None and len(row_datetime) == 2 and \ isinstance(row_datetime[0], datetime) and isinstance(row_datetime[1], datetime): - min_datetime = row_datetime[0] + # NOTE: To avoid omission mistakes, we start one hour early + min_datetime = row_datetime[0] - timedelta(hours=1) max_datetime = row_datetime[1] except Exception as e: logger.error("Error in Step 1 of clean_energy_value.process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(60) continue print("min_datetime: " + min_datetime.isoformat()[0:19]) @@ -170,6 +171,8 @@ def process(logger): # 3333 2018-02-08 00:54:16 165599.015625 good ################################################################################################################ print("Step 2: Processing bad case 1.x") + cnx_system = None + cursor_system = None try: cnx_system = mysql.connector.connect(**config.myems_system_db) cursor_system = cnx_system.cursor(dictionary=True) @@ -193,23 +196,24 @@ def process(logger): if cursor_system: cursor_system.close() if cnx_system: - cnx_system.close() + cnx_system.disconnect() try: query = (" SELECT id, point_id, actual_value " " FROM tbl_energy_value " " WHERE utc_date_time >= %s AND utc_date_time <= %s AND is_bad IS NOT TRUE ") - cursor.execute(query, (min_datetime, max_datetime,)) - rows_energy_values = cursor.fetchall() + cursor_historical.execute(query, (min_datetime, max_datetime,)) + rows_energy_values = cursor_historical.fetchall() except Exception as e: logger.error("Error in step 2.2 of clean_energy_value.process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(60) continue + # initialize bad list bad_list = list() if rows_energy_values is not None and len(rows_energy_values) > 0: @@ -226,14 +230,14 @@ def process(logger): update = (" UPDATE tbl_energy_value " " SET is_bad = TRUE " " WHERE id IN (" + ', '.join(map(str, bad_list)) + ")") - cursor.execute(update, ) - cnx.commit() + cursor_historical.execute(update, ) + cnx_historical.commit() except Exception as e: logger.error("Error in step 2.3 of clean_energy_value.process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(60) continue @@ -380,14 +384,14 @@ def process(logger): " FROM tbl_energy_value " " WHERE utc_date_time >= %s AND utc_date_time <= %s AND is_bad IS NOT TRUE " " ORDER BY point_id, utc_date_time ") - cursor.execute(query, (min_datetime, max_datetime,)) - rows_energy_values = cursor.fetchall() + cursor_historical.execute(query, (min_datetime, max_datetime,)) + rows_energy_values = cursor_historical.fetchall() except Exception as e: logger.error("Error in step 3.1 of clean_energy_value.process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(60) continue @@ -416,6 +420,7 @@ def process(logger): if len(current_point_value_list) > 0: point_value_dict[current_point_id] = current_point_value_list + # reinitialize bad list bad_list = list() for point_id, point_value_list in point_value_dict.items(): @@ -449,14 +454,14 @@ def process(logger): update = (" UPDATE tbl_energy_value " " SET is_bad = TRUE " " WHERE id IN (" + ', '.join(map(str, bad_list)) + ")") - cursor.execute(update, ) - cnx.commit() + cursor_historical.execute(update, ) + cnx_historical.commit() except Exception as e: logger.error("Error in step 3.2 of clean_energy_value.process " + str(e)) - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(60) continue @@ -544,16 +549,16 @@ def process(logger): " SET is_bad = FALSE " " WHERE utc_date_time >= %s AND utc_date_time < %s AND is_bad IS NULL ") # NOTE: use '<' instead of '<=' in WHERE statement because there may be some new inserted values - cursor.execute(update, (min_datetime, max_datetime,)) - cnx.commit() + cursor_historical.execute(update, (min_datetime, max_datetime,)) + cnx_historical.commit() except Exception as e: logger.error("Error in step 4 of clean_energy_value.process " + str(e)) time.sleep(60) continue finally: - if cursor: - cursor.close() - if cnx: - cnx.close() + if cursor_historical: + cursor_historical.close() + if cnx_historical: + cnx_historical.disconnect() time.sleep(900) diff --git a/myems-cleaning/config.py b/myems-cleaning/config.py index 8cbaf1d9..b2eb0384 100644 --- a/myems-cleaning/config.py +++ b/myems-cleaning/config.py @@ -1,4 +1,3 @@ -# definition of the database myems_system_db = { 'user': 'root', 'password': '!MyEMS1', @@ -18,7 +17,8 @@ myems_historical_db = { # indicates how long analog values and digital values will be kept in database # the longer days the more memory and disc space needed. live_in_days = 365 -# note: By default, energy values in historical db will never be deleted automatically. + +# NOTE: By default, energy values in historical db will never be deleted automatically. # indicates if the program is in debug mode is_debug = False