From afe05816b5c12d24d70fac89841b90c06524ad6e Mon Sep 17 00:00:00 2001 From: "13621160019@163.com" <13621160019@163.com> Date: Fri, 30 Apr 2021 14:07:46 +0800 Subject: [PATCH] added gateway process to myems-modbus-tcp service --- myems-bacnet/gateway.py | 4 +- myems-modbus-tcp/README.md | 10 +++++ myems-modbus-tcp/gateway.py | 85 +++++++++++++++++++++++++++++++++++++ myems-modbus-tcp/main.py | 6 +++ 4 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 myems-modbus-tcp/gateway.py diff --git a/myems-bacnet/gateway.py b/myems-bacnet/gateway.py index 1a26d9d2..5c99447c 100644 --- a/myems-bacnet/gateway.py +++ b/myems-bacnet/gateway.py @@ -63,7 +63,7 @@ def job(logger, ): ############################################################################################################ update_row = (" UPDATE tbl_gateways " " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " - " WHERE id = %s ") + " WHERE id = %s ") try: cursor_system_db.execute(update_row, (config.gateway['id'], )) cnx_system_db.commit() @@ -78,7 +78,7 @@ def job(logger, ): def process(logger, ): - schedule.every(3).minutes.do(job, logger,) + schedule.every(int(config.interval_in_seconds/60)).minutes.do(job, logger,) while True: schedule.run_pending() diff --git a/myems-modbus-tcp/README.md b/myems-modbus-tcp/README.md index 984e6305..2d111d93 100644 --- a/myems-modbus-tcp/README.md +++ b/myems-modbus-tcp/README.md @@ -11,6 +11,8 @@ modbus-tk mysql.connector +Schedule + ### Installation Download and install MySQL Connector: @@ -22,6 +24,14 @@ Download and install MySQL Connector: $ sudo python3 setup.py install ``` +Download and install Schedule +``` + $ cd ~/tools + $ git clone https://github.com/dbader/schedule.git + $ cd ~/tools/schedule + $ sudo python3 setup.py install +``` + Download and install modbus-tk ``` $ cd ~/tools diff --git a/myems-modbus-tcp/gateway.py b/myems-modbus-tcp/gateway.py new file mode 100644 index 00000000..5c99447c --- /dev/null +++ b/myems-modbus-tcp/gateway.py @@ -0,0 +1,85 @@ +import mysql.connector +import config +import time +from datetime import datetime +import schedule + +######################################################################################################################## +# Gateway Job Procedures +# Step 1: Verify Gateway Token +# Step 2: Collect Gateway Information +# Step 3: Update Gateway Information +######################################################################################################################## + + +def job(logger, ): + ################################################################################################################ + # Step 1: Verify Gateway Token + ################################################################################################################ + cnx_system_db = None + cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 1.1 of Gateway process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + return + + # TODO: choose a more secure method to verify gateway token + try: + query = (" SELECT name " + " FROM tbl_gateways " + " WHERE id = %s AND token = %s ") + cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'])) + row = cursor_system_db.fetchone() + except Exception as e: + logger.error("Error in step 1.2 of gateway process: " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + return + + if row is None: + logger.error("Error in step 1.3 of gateway process: Not Found ") + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + return + + ############################################################################################################ + # Step 2: Collect Gateway Information + ############################################################################################################ + # todo: get more information, such as CPU/MEMORY/DISK + current_datetime_utc = datetime.utcnow() + + ############################################################################################################ + # Step 3: Update Gateway Information + ############################################################################################################ + update_row = (" UPDATE tbl_gateways " + " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " + " WHERE id = %s ") + try: + cursor_system_db.execute(update_row, (config.gateway['id'], )) + cnx_system_db.commit() + except Exception as e: + logger.error("Error in step 3.1 of gateway process " + str(e)) + return + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + +def process(logger, ): + schedule.every(int(config.interval_in_seconds/60)).minutes.do(job, logger,) + + while True: + schedule.run_pending() + time.sleep(60) diff --git a/myems-modbus-tcp/main.py b/myems-modbus-tcp/main.py index 0dd3c351..60929449 100644 --- a/myems-modbus-tcp/main.py +++ b/myems-modbus-tcp/main.py @@ -5,6 +5,7 @@ from multiprocessing import Process import time import logging from logging.handlers import RotatingFileHandler +import gateway import acquisition @@ -25,6 +26,11 @@ def main(): # add the handlers to logger logger.addHandler(fh) + #################################################################################################################### + # Create Gateway Process + #################################################################################################################### + Process(target=gateway.process, args=(logger,)).start() + # Get Data Sources while True: # TODO: This service has to RESTART to reload latest data sources and this should be fixed