From b87d41af87d8af28c20e90fafa7b0040ce5ff003 Mon Sep 17 00:00:00 2001 From: "13621160019@163.com" <13621160019@163.com> Date: Fri, 30 Apr 2021 12:40:50 +0800 Subject: [PATCH] added multiprocessing to main of myems-bacnet service --- myems-bacnet/README.md | 10 +++ myems-bacnet/gateway.py | 132 ++++++++++++++++++++-------------------- myems-bacnet/main.py | 5 +- 3 files changed, 80 insertions(+), 67 deletions(-) diff --git a/myems-bacnet/README.md b/myems-bacnet/README.md index 689187d8..e2d29c9f 100644 --- a/myems-bacnet/README.md +++ b/myems-bacnet/README.md @@ -10,6 +10,8 @@ bacpypes mysql.connector +Schedule + ## Installation Download and install MySQL Connector: @@ -21,6 +23,14 @@ Download and install MySQL Connector: $ sudo python3 setup.py install ``` +Download and install Schedule +``` + $ cd ~/tools + $ git clone https://github.com/dbader/schedule.git + $ cd ~/tools/schedule + $ sudo python3 setup.py install +``` + Download and install bacpypes library ``` $ cd ~/tools diff --git a/myems-bacnet/gateway.py b/myems-bacnet/gateway.py index 039b6fba..1a26d9d2 100644 --- a/myems-bacnet/gateway.py +++ b/myems-bacnet/gateway.py @@ -12,72 +12,74 @@ import schedule ######################################################################################################################## -def process(logger, ): +def job(logger, ): + ################################################################################################################ + # Step 1: Verify Gateway Token + ################################################################################################################ cnx_system_db = None cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 1.1 of Gateway process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + return + + # TODO: choose a more secure method to verify gateway token + try: + query = (" SELECT name " + " FROM tbl_gateways " + " WHERE id = %s AND token = %s ") + cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'])) + row = cursor_system_db.fetchone() + except Exception as e: + logger.error("Error in step 1.2 of gateway process: " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + return + + if row is None: + logger.error("Error in step 1.3 of gateway process: Not Found ") + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + return + + ############################################################################################################ + # Step 2: Collect Gateway Information + ############################################################################################################ + # todo: get more information, such as CPU/MEMORY/DISK + current_datetime_utc = datetime.utcnow() + + ############################################################################################################ + # Step 3: Update Gateway Information + ############################################################################################################ + update_row = (" UPDATE tbl_gateways " + " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " + " WHERE id = %s ") + try: + cursor_system_db.execute(update_row, (config.gateway['id'], )) + cnx_system_db.commit() + except Exception as e: + logger.error("Error in step 3.1 of gateway process " + str(e)) + return + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + +def process(logger, ): + schedule.every(3).minutes.do(job, logger,) + while True: - # the outermost while loop - ################################################################################################################ - # Step 1: Verify Gateway Token - ################################################################################################################ - while not cnx_system_db or not cnx_system_db.is_connected(): - try: - cnx_system_db = mysql.connector.connect(**config.myems_system_db) - cursor_system_db = cnx_system_db.cursor() - except Exception as e: - logger.error("Error to connect to myems_system_db in step 1.1 of gateway process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - print("Failed to connect to myems_system_db, sleep a minute and retry...") - time.sleep(60) - continue - - # TODO: choose a more secure method to verify gateway token - try: - query = (" SELECT name " - " FROM tbl_gateways " - " WHERE id = %s AND token = %s ") - cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'])) - row = cursor_system_db.fetchone() - except Exception as e: - logger.error("Error in step 1.2 of gateway process: " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - time.sleep(60) - continue - - if row is None: - logger.error("Error in step 1.3 of gateway process: Gateway Not Found ") - time.sleep(60) - continue - ############################################################################################################ - # Step 2: Collect Gateway Information - ############################################################################################################ - # todo: get more information, such as CPU/MEMORY/DISK - current_datetime_utc = datetime.utcnow() - - ############################################################################################################ - # Step 3: Update Gateway Information - ############################################################################################################ - update_row = (" UPDATE tbl_gateways " - " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " - " WHERE id = %s ") - try: - cursor_system_db.execute(update_row, (config.gateway['id'], )) - cnx_system_db.commit() - except Exception as e: - logger.error("Error in step 3.2 of gateway process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - time.sleep(60) - continue - - # sleep some seconds + schedule.run_pending() time.sleep(60) - # end of the outermost while loop diff --git a/myems-bacnet/main.py b/myems-bacnet/main.py index 7b5fb08d..555b87b5 100644 --- a/myems-bacnet/main.py +++ b/myems-bacnet/main.py @@ -1,5 +1,6 @@ import logging from logging.handlers import RotatingFileHandler +from multiprocessing import Process import acquisition import gateway @@ -24,11 +25,11 @@ def main(): #################################################################################################################### # Create Acquisition Process #################################################################################################################### - acquisition.process(logger,) + Process(target=acquisition.process, args=(logger,)).start() #################################################################################################################### # Create Gateway Process #################################################################################################################### - gateway.process(logger,) + Process(target=gateway.process, args=(logger,)).start() if __name__ == "__main__":