fixed omissioin mistakes in myems-cleaning

pull/25/head
13621160019@163.com 2021-03-26 16:58:57 +08:00
parent 9e32d75470
commit f676ec7199
4 changed files with 86 additions and 77 deletions

View File

@ -7,30 +7,32 @@ import schedule
def job(logger):
cnx = None
cursor = None
cnx_historical = None
cursor_historical = None
try:
cnx = mysql.connector.connect(**config.myems_historical_db)
cursor = cnx.cursor()
cnx_historical = mysql.connector.connect(**config.myems_historical_db)
cursor_historical = cnx_historical.cursor()
except Exception as e:
logger.error("Error in clean analog value process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
return
expired_utc = datetime.utcnow() - timedelta(days=config.live_in_days)
try:
cursor.execute(" DELETE FROM tbl_analog_value WHERE utc_date_time < %s ", (expired_utc,))
cnx.commit()
cursor_historical.execute(" DELETE "
" FROM tbl_analog_value "
" WHERE utc_date_time < %s ", (expired_utc,))
cnx_historical.commit()
except Exception as e:
logger.error("Error in delete_expired_trend process " + str(e))
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
logger.info("Deleted trend before date time in UTC: " + expired_utc.isoformat()[0:19])

View File

@ -7,30 +7,32 @@ import schedule
def job(logger):
cnx = None
cursor = None
cnx_historical = None
cursor_historical = None
try:
cnx = mysql.connector.connect(**config.myems_historical_db)
cursor = cnx.cursor()
cnx_historical = mysql.connector.connect(**config.myems_historical_db)
cursor_historical = cnx_historical.cursor()
except Exception as e:
logger.error("Error in clean digital value process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
return
expired_utc = datetime.utcnow() - timedelta(days=config.live_in_days)
try:
cursor.execute(" DELETE FROM tbl_digital_value WHERE utc_date_time < %s ", (expired_utc,))
cnx.commit()
cursor_historical.execute(" DELETE "
" FROM tbl_digital_value "
" WHERE utc_date_time < %s ", (expired_utc,))
cnx_historical.commit()
except Exception as e:
logger.error("Error in delete_expired_trend process " + str(e))
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
logger.info("Deleted trend before date time in UTC: " + expired_utc.isoformat()[0:19])

View File

@ -1,7 +1,7 @@
import mysql.connector
import config
import time
from datetime import datetime
from datetime import datetime, timedelta
########################################################################################################################
@ -17,17 +17,17 @@ def process(logger):
while True:
# the outermost loop to reconnect server if there is a connection error
cnx = None
cursor = None
cnx_historical = None
cursor_historical = None
try:
cnx = mysql.connector.connect(**config.myems_historical_db)
cursor = cnx.cursor()
cnx_historical = mysql.connector.connect(**config.myems_historical_db)
cursor_historical = cnx_historical.cursor()
except Exception as e:
logger.error("Error at the begin of clean_energy_value.process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(60)
continue
@ -46,19 +46,20 @@ def process(logger):
query = (" SELECT MIN(utc_date_time), MAX(utc_date_time) "
" FROM tbl_energy_value "
" WHERE is_bad IS NULL ")
cursor.execute(query, ())
row_datetime = cursor.fetchone()
cursor_historical.execute(query, ())
row_datetime = cursor_historical.fetchone()
if row_datetime is not None and len(row_datetime) == 2 and \
isinstance(row_datetime[0], datetime) and isinstance(row_datetime[1], datetime):
min_datetime = row_datetime[0]
# NOTE: To avoid omission mistakes, we start one hour early
min_datetime = row_datetime[0] - timedelta(hours=1)
max_datetime = row_datetime[1]
except Exception as e:
logger.error("Error in Step 1 of clean_energy_value.process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(60)
continue
print("min_datetime: " + min_datetime.isoformat()[0:19])
@ -170,6 +171,8 @@ def process(logger):
# 3333 2018-02-08 00:54:16 165599.015625 good
################################################################################################################
print("Step 2: Processing bad case 1.x")
cnx_system = None
cursor_system = None
try:
cnx_system = mysql.connector.connect(**config.myems_system_db)
cursor_system = cnx_system.cursor(dictionary=True)
@ -193,23 +196,24 @@ def process(logger):
if cursor_system:
cursor_system.close()
if cnx_system:
cnx_system.close()
cnx_system.disconnect()
try:
query = (" SELECT id, point_id, actual_value "
" FROM tbl_energy_value "
" WHERE utc_date_time >= %s AND utc_date_time <= %s AND is_bad IS NOT TRUE ")
cursor.execute(query, (min_datetime, max_datetime,))
rows_energy_values = cursor.fetchall()
cursor_historical.execute(query, (min_datetime, max_datetime,))
rows_energy_values = cursor_historical.fetchall()
except Exception as e:
logger.error("Error in step 2.2 of clean_energy_value.process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(60)
continue
# initialize bad list
bad_list = list()
if rows_energy_values is not None and len(rows_energy_values) > 0:
@ -226,14 +230,14 @@ def process(logger):
update = (" UPDATE tbl_energy_value "
" SET is_bad = TRUE "
" WHERE id IN (" + ', '.join(map(str, bad_list)) + ")")
cursor.execute(update, )
cnx.commit()
cursor_historical.execute(update, )
cnx_historical.commit()
except Exception as e:
logger.error("Error in step 2.3 of clean_energy_value.process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(60)
continue
@ -380,14 +384,14 @@ def process(logger):
" FROM tbl_energy_value "
" WHERE utc_date_time >= %s AND utc_date_time <= %s AND is_bad IS NOT TRUE "
" ORDER BY point_id, utc_date_time ")
cursor.execute(query, (min_datetime, max_datetime,))
rows_energy_values = cursor.fetchall()
cursor_historical.execute(query, (min_datetime, max_datetime,))
rows_energy_values = cursor_historical.fetchall()
except Exception as e:
logger.error("Error in step 3.1 of clean_energy_value.process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(60)
continue
@ -416,6 +420,7 @@ def process(logger):
if len(current_point_value_list) > 0:
point_value_dict[current_point_id] = current_point_value_list
# reinitialize bad list
bad_list = list()
for point_id, point_value_list in point_value_dict.items():
@ -449,14 +454,14 @@ def process(logger):
update = (" UPDATE tbl_energy_value "
" SET is_bad = TRUE "
" WHERE id IN (" + ', '.join(map(str, bad_list)) + ")")
cursor.execute(update, )
cnx.commit()
cursor_historical.execute(update, )
cnx_historical.commit()
except Exception as e:
logger.error("Error in step 3.2 of clean_energy_value.process " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(60)
continue
@ -544,16 +549,16 @@ def process(logger):
" SET is_bad = FALSE "
" WHERE utc_date_time >= %s AND utc_date_time < %s AND is_bad IS NULL ")
# NOTE: use '<' instead of '<=' in WHERE statement because there may be some new inserted values
cursor.execute(update, (min_datetime, max_datetime,))
cnx.commit()
cursor_historical.execute(update, (min_datetime, max_datetime,))
cnx_historical.commit()
except Exception as e:
logger.error("Error in step 4 of clean_energy_value.process " + str(e))
time.sleep(60)
continue
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
if cursor_historical:
cursor_historical.close()
if cnx_historical:
cnx_historical.disconnect()
time.sleep(900)

View File

@ -1,4 +1,3 @@
# definition of the database
myems_system_db = {
'user': 'root',
'password': '!MyEMS1',
@ -18,7 +17,8 @@ myems_historical_db = {
# indicates how long analog values and digital values will be kept in database
# the longer days the more memory and disc space needed.
live_in_days = 365
# note: By default, energy values in historical db will never be deleted automatically.
# NOTE: By default, energy values in historical db will never be deleted automatically.
# indicates if the program is in debug mode
is_debug = False