added multiprocessing to main of myems-bacnet service
parent
3eebbbc533
commit
b87d41af87
|
@ -10,6 +10,8 @@ bacpypes
|
|||
|
||||
mysql.connector
|
||||
|
||||
Schedule
|
||||
|
||||
## Installation
|
||||
|
||||
Download and install MySQL Connector:
|
||||
|
@ -21,6 +23,14 @@ Download and install MySQL Connector:
|
|||
$ sudo python3 setup.py install
|
||||
```
|
||||
|
||||
Download and install Schedule
|
||||
```
|
||||
$ cd ~/tools
|
||||
$ git clone https://github.com/dbader/schedule.git
|
||||
$ cd ~/tools/schedule
|
||||
$ sudo python3 setup.py install
|
||||
```
|
||||
|
||||
Download and install bacpypes library
|
||||
```
|
||||
$ cd ~/tools
|
||||
|
|
|
@ -12,72 +12,74 @@ import schedule
|
|||
########################################################################################################################
|
||||
|
||||
|
||||
def process(logger, ):
|
||||
def job(logger, ):
|
||||
################################################################################################################
|
||||
# Step 1: Verify Gateway Token
|
||||
################################################################################################################
|
||||
cnx_system_db = None
|
||||
cursor_system_db = None
|
||||
try:
|
||||
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||
cursor_system_db = cnx_system_db.cursor()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 1.1 of Gateway process " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
return
|
||||
|
||||
# TODO: choose a more secure method to verify gateway token
|
||||
try:
|
||||
query = (" SELECT name "
|
||||
" FROM tbl_gateways "
|
||||
" WHERE id = %s AND token = %s ")
|
||||
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token']))
|
||||
row = cursor_system_db.fetchone()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 1.2 of gateway process: " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
return
|
||||
|
||||
if row is None:
|
||||
logger.error("Error in step 1.3 of gateway process: Not Found ")
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
return
|
||||
|
||||
############################################################################################################
|
||||
# Step 2: Collect Gateway Information
|
||||
############################################################################################################
|
||||
# todo: get more information, such as CPU/MEMORY/DISK
|
||||
current_datetime_utc = datetime.utcnow()
|
||||
|
||||
############################################################################################################
|
||||
# Step 3: Update Gateway Information
|
||||
############################################################################################################
|
||||
update_row = (" UPDATE tbl_gateways "
|
||||
" SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
|
||||
" WHERE id = %s ")
|
||||
try:
|
||||
cursor_system_db.execute(update_row, (config.gateway['id'], ))
|
||||
cnx_system_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 3.1 of gateway process " + str(e))
|
||||
return
|
||||
finally:
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
|
||||
|
||||
def process(logger, ):
|
||||
schedule.every(3).minutes.do(job, logger,)
|
||||
|
||||
while True:
|
||||
# the outermost while loop
|
||||
################################################################################################################
|
||||
# Step 1: Verify Gateway Token
|
||||
################################################################################################################
|
||||
while not cnx_system_db or not cnx_system_db.is_connected():
|
||||
try:
|
||||
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||
cursor_system_db = cnx_system_db.cursor()
|
||||
except Exception as e:
|
||||
logger.error("Error to connect to myems_system_db in step 1.1 of gateway process " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
print("Failed to connect to myems_system_db, sleep a minute and retry...")
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
# TODO: choose a more secure method to verify gateway token
|
||||
try:
|
||||
query = (" SELECT name "
|
||||
" FROM tbl_gateways "
|
||||
" WHERE id = %s AND token = %s ")
|
||||
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token']))
|
||||
row = cursor_system_db.fetchone()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 1.2 of gateway process: " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
if row is None:
|
||||
logger.error("Error in step 1.3 of gateway process: Gateway Not Found ")
|
||||
time.sleep(60)
|
||||
continue
|
||||
############################################################################################################
|
||||
# Step 2: Collect Gateway Information
|
||||
############################################################################################################
|
||||
# todo: get more information, such as CPU/MEMORY/DISK
|
||||
current_datetime_utc = datetime.utcnow()
|
||||
|
||||
############################################################################################################
|
||||
# Step 3: Update Gateway Information
|
||||
############################################################################################################
|
||||
update_row = (" UPDATE tbl_gateways "
|
||||
" SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' "
|
||||
" WHERE id = %s ")
|
||||
try:
|
||||
cursor_system_db.execute(update_row, (config.gateway['id'], ))
|
||||
cnx_system_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 3.2 of gateway process " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
# sleep some seconds
|
||||
schedule.run_pending()
|
||||
time.sleep(60)
|
||||
# end of the outermost while loop
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
from multiprocessing import Process
|
||||
import acquisition
|
||||
import gateway
|
||||
|
||||
|
@ -24,11 +25,11 @@ def main():
|
|||
####################################################################################################################
|
||||
# Create Acquisition Process
|
||||
####################################################################################################################
|
||||
acquisition.process(logger,)
|
||||
Process(target=acquisition.process, args=(logger,)).start()
|
||||
####################################################################################################################
|
||||
# Create Gateway Process
|
||||
####################################################################################################################
|
||||
gateway.process(logger,)
|
||||
Process(target=gateway.process, args=(logger,)).start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
Loading…
Reference in New Issue