added multiprocessing to main of myems-bacnet service

pull/43/MERGE
13621160019@163.com 2021-04-30 12:41:48 +08:00
commit bbc44bba05
3 changed files with 80 additions and 67 deletions

View File

@ -10,6 +10,8 @@ bacpypes
mysql.connector
Schedule
## Installation
Download and install MySQL Connector:
@ -21,6 +23,14 @@ Download and install MySQL Connector:
$ sudo python3 setup.py install
```
Download and install Schedule
```
$ cd ~/tools
$ git clone https://github.com/dbader/schedule.git
$ cd ~/tools/schedule
$ sudo python3 setup.py install
```
Download and install bacpypes library
```
$ cd ~/tools

View File

@ -12,72 +12,74 @@ import schedule
########################################################################################################################
def process(logger, ):
def job(logger, ):
################################################################################################################
# Step 1: Verify Gateway Token
################################################################################################################
cnx_system_db = None
cursor_system_db = None
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error in step 1.1 of Gateway process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
return
# TODO: choose a more secure method to verify gateway token
try:
query = (" SELECT name "
" FROM tbl_gateways "
" WHERE id = %s AND token = %s ")
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token']))
row = cursor_system_db.fetchone()
except Exception as e:
logger.error("Error in step 1.2 of gateway process: " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
return
if row is None:
logger.error("Error in step 1.3 of gateway process: Not Found ")
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
return
############################################################################################################
# Step 2: Collect Gateway Information
############################################################################################################
# todo: get more information, such as CPU/MEMORY/DISK
current_datetime_utc = datetime.utcnow()
############################################################################################################
# Step 3: Update Gateway Information
############################################################################################################
update_row = (" UPDATE tbl_gateways "
" SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
" WHERE id = %s ")
try:
cursor_system_db.execute(update_row, (config.gateway['id'], ))
cnx_system_db.commit()
except Exception as e:
logger.error("Error in step 3.1 of gateway process " + str(e))
return
finally:
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
def process(logger, ):
schedule.every(3).minutes.do(job, logger,)
while True:
# the outermost while loop
################################################################################################################
# Step 1: Verify Gateway Token
################################################################################################################
while not cnx_system_db or not cnx_system_db.is_connected():
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error to connect to myems_system_db in step 1.1 of gateway process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
print("Failed to connect to myems_system_db, sleep a minute and retry...")
time.sleep(60)
continue
# TODO: choose a more secure method to verify gateway token
try:
query = (" SELECT name "
" FROM tbl_gateways "
" WHERE id = %s AND token = %s ")
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token']))
row = cursor_system_db.fetchone()
except Exception as e:
logger.error("Error in step 1.2 of gateway process: " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
time.sleep(60)
continue
if row is None:
logger.error("Error in step 1.3 of gateway process: Gateway Not Found ")
time.sleep(60)
continue
############################################################################################################
# Step 2: Collect Gateway Information
############################################################################################################
# todo: get more information, such as CPU/MEMORY/DISK
current_datetime_utc = datetime.utcnow()
############################################################################################################
# Step 3: Update Gateway Information
############################################################################################################
update_row = (" UPDATE tbl_gateways "
" SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
" WHERE id = %s ")
try:
cursor_system_db.execute(update_row, (config.gateway['id'], ))
cnx_system_db.commit()
except Exception as e:
logger.error("Error in step 3.2 of gateway process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
time.sleep(60)
continue
# sleep some seconds
schedule.run_pending()
time.sleep(60)
# end of the outermost while loop

View File

@ -1,5 +1,6 @@
import logging
from logging.handlers import RotatingFileHandler
from multiprocessing import Process
import acquisition
import gateway
@ -24,11 +25,11 @@ def main():
####################################################################################################################
# Create Acquisition Process
####################################################################################################################
acquisition.process(logger,)
Process(target=acquisition.process, args=(logger,)).start()
####################################################################################################################
# Create Gateway Process
####################################################################################################################
gateway.process(logger,)
Process(target=gateway.process, args=(logger,)).start()
if __name__ == "__main__":