From 43a942c19d8b330c1dc665c71c9edefad43947c5 Mon Sep 17 00:00:00 2001 From: "13621160019@163.com" <13621160019@163.com> Date: Fri, 19 Feb 2021 13:21:05 +0800 Subject: [PATCH] merged myems-mqtt-publisher --- myems-mqtt-publisher/LICENSE | 21 ++ myems-mqtt-publisher/README.md | 95 +++++++ myems-mqtt-publisher/acquisition.py | 240 ++++++++++++++++++ myems-mqtt-publisher/config.py | 31 +++ myems-mqtt-publisher/main.py | 33 +++ .../myems-mqtt-publisher.service | 15 ++ myems-mqtt-publisher/test.py | 81 ++++++ 7 files changed, 516 insertions(+) create mode 100644 myems-mqtt-publisher/LICENSE create mode 100644 myems-mqtt-publisher/README.md create mode 100644 myems-mqtt-publisher/acquisition.py create mode 100644 myems-mqtt-publisher/config.py create mode 100644 myems-mqtt-publisher/main.py create mode 100644 myems-mqtt-publisher/myems-mqtt-publisher.service create mode 100644 myems-mqtt-publisher/test.py diff --git a/myems-mqtt-publisher/LICENSE b/myems-mqtt-publisher/LICENSE new file mode 100644 index 00000000..b91c1ac4 --- /dev/null +++ b/myems-mqtt-publisher/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 MyEMS + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/myems-mqtt-publisher/README.md b/myems-mqtt-publisher/README.md new file mode 100644 index 00000000..4dd1a786 --- /dev/null +++ b/myems-mqtt-publisher/README.md @@ -0,0 +1,95 @@ +## MyEMS MQTT Publisher Service + +### Introduction +This service is a component of MyEMS to publish data to MQTT broker. + +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/eb783b8f80d94fa583dd1ebe953f0e97)](https://app.codacy.com/gh/myems/myems-mqtt-publisher?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-mqtt-publisher&utm_campaign=Badge_Grade) +[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-mqtt-publisher/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-mqtt-publisher/?branch=master) +[![Maintainability](https://api.codeclimate.com/v1/badges/f2cb7c3fb4a7499e9d1d/maintainability)](https://codeclimate.com/github/myems/myems-mqtt-publisher/maintainability) +[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-mqtt-publisher.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-mqtt-publisher/alerts/) + + +### Prerequisites +simplejson + +paho-mqtt + +mysql.connector + +### Installation + +Download and Install simplejson +``` + $ cd ~/tools + $ git clone https://github.com/simplejson/simplejson.git + $ cd simplejson + $ sudo python3 setup.py install +``` + +Download and install MySQL Connector: +``` + $ cd ~/tools + $ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz + $ tar xzf mysql-connector-python-8.0.20.tar.gz + $ cd ~/tools/mysql-connector-python-8.0.20 + $ sudo python3 setup.py install +``` + +Download and install paho-mqtt: +``` + $ cd ~/tools + $ git clone https://github.com/eclipse/paho.mqtt.python.git + $ cd ~/tools/paho.mqtt.python + $ sudo python3 setup.py install +``` + +Install myems-mqtt-publisher service +``` + $ cd ~ + $ git clone https://github.com/myems/myems.git + $ cd myems + $ sudo git checkout master (or the latest release tag) + $ sudo cp -R ~/myems/myems-mqtt-publisher /myems-mqtt-publisher +``` + Eidt the config +``` + $ sudo nano /myems-mqtt-publisher/config.py +``` + Setup systemd service: +``` + $ sudo cp /myems-mqtt-publisher/myems-mqtt-publisher.service /lib/systemd/system/ + $ sudo systemctl enable myems-mqtt-publisher.service + $ sudo systemctl start myems-mqtt-publisher.service +``` + +### Topic +topic_prefix in config and point_id + +Example: +``` +'myems/point/3' +``` + +### Payload +data_source_id, the Data Source ID. + +point_id, the Point ID. + +object_type, the type of data, is one of 'ANALOG_VALUE'(decimal(18, 3)) , 'ENERGY_VALUE'(decimal(18, 3)) and 'DIGITAL_VALUE'(int(11)). + +utc_date_time, the date time in utc when data was acquired. The full format looks like 'YYYY-MM-DDTHH:MM:SS'. + +value, the data value in Decimal or Integer. + +Example: +``` +{"data_source_id": 1, "point_id": 3, "object_type": 'ANALOG_VALUE', "utc_date_time": "2020-09-28T03:23:06", "value": Decimal('591960276.000')} +``` + +### References + [1]. http://myems.io + + [2]. https://www.eclipse.org/paho/clients/python/ + + [3]. https://simplejson.readthedocs.io/ + diff --git a/myems-mqtt-publisher/acquisition.py b/myems-mqtt-publisher/acquisition.py new file mode 100644 index 00000000..adcc257b --- /dev/null +++ b/myems-mqtt-publisher/acquisition.py @@ -0,0 +1,240 @@ +import json +import mysql.connector +import time +import simplejson as json +import paho.mqtt.client as mqtt +import config + + +# indicates the connectivity with the MQTT broker +mqtt_connected_flag = False + + +# the on_connect callback function for MQTT client +# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ +def on_mqtt_connect(client, userdata, flags, rc): + global mqtt_connected_flag + if rc == 0: + mqtt_connected_flag = True # set flag + print("MQTT connected OK") + else: + print("Bad MQTT connection Returned code=", rc) + mqtt_connected_flag = False + + +# the on_disconnect callback function for MQTT client +# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ +def on_mqtt_disconnect(client, userdata, rc=0): + global mqtt_connected_flag + + print("DisConnected, result code "+str(rc)) + mqtt_connected_flag = False + + +######################################################################################################################## +# Acquisition Procedures +# Step 1: Get point list +# Step 2: Connect to the historical database +# Step 3: Connect to the MQTT broker +# Step 4: Read point values from latest tables in historical database +# Step 5: Publish point values +######################################################################################################################## +def process(logger, object_type): + + while True: + # the outermost while loop + + ################################################################################################################ + # Step 1: Get point list + ################################################################################################################ + cnx_system_db = None + cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 1.1 of acquisition process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep and then continue the outermost loop to reload points + time.sleep(60) + continue + + try: + if object_type == 'ANALOG_VALUE': + query = (" SELECT id, name, data_source_id " + " FROM tbl_points" + " WHERE object_type = 'ANALOG_VALUE' " + " ORDER BY id ") + elif object_type == 'DIGITAL_VALUE': + query = (" SELECT id, name, data_source_id " + " FROM tbl_points" + " WHERE object_type = 'DIGITAL_VALUE' " + " ORDER BY id ") + elif object_type == 'ENERGY_VALUE': + query = (" SELECT id, name, data_source_id " + " FROM tbl_points" + " WHERE object_type = 'ENERGY_VALUE' " + " ORDER BY id ") + + cursor_system_db.execute(query, ) + rows_point = cursor_system_db.fetchall() + except Exception as e: + logger.error("Error in step 1.2 of acquisition process: " + str(e)) + # sleep several minutes and continue the outer loop to reload points + time.sleep(60) + continue + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + if rows_point is None or len(rows_point) == 0: + # there is no points + logger.error("Point Not Found, acquisition process terminated ") + # sleep 60 seconds and go back to the begin of outermost while loop to reload points + time.sleep(60) + continue + + point_dict = dict() + for row_point in rows_point: + point_dict[row_point[0]] = {'name': row_point[1], 'data_source_id': row_point[2]} + + ################################################################################################################ + # Step 2: Connect to the historical database + ################################################################################################################ + cnx_historical_db = None + cursor_historical_db = None + try: + cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) + cursor_historical_db = cnx_historical_db.cursor() + except Exception as e: + logger.error("Error in step 2.1 of acquisition process " + str(e)) + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + # sleep 60 seconds and go back to the begin of outermost while loop to reload points + time.sleep(60) + continue + + ################################################################################################################ + # Step 3: Connect to the MQTT broker + ################################################################################################################ + mqc = None + try: + mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time())) + mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password']) + mqc.on_connect = on_mqtt_connect + mqc.on_disconnect = on_mqtt_disconnect + mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60) + # The loop_start() starts a new thread, that calls the loop method at regular intervals for you. + # It also handles re-connects automatically. + mqc.loop_start() + + except Exception as e: + logger.error("Error in step 3.1 of acquisition process " + str(e)) + # sleep 60 seconds and go back to the begin of outermost while loop to reload points + time.sleep(60) + continue + + ################################################################################################################ + # Step 4: Read point values from latest tables in historical database + ################################################################################################################ + # inner loop to read all point latest values and publish them within a period + while True: + if object_type == 'ANALOG_VALUE': + query = " SELECT point_id, utc_date_time, actual_value" \ + " FROM tbl_analog_value_latest WHERE point_id IN ( " + elif object_type == 'DIGITAL_VALUE': + query = " SELECT point_id, utc_date_time, actual_value" \ + " FROM tbl_digital_value_latest WHERE point_id IN ( " + elif object_type == 'ENERGY_VALUE': + query = " SELECT point_id, utc_date_time, actual_value" \ + " FROM tbl_energy_value_latest WHERE point_id IN ( " + + for point_id in point_dict: + query += str(point_id) + "," + + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(query[:-1] + ")") + rows_point_values = cursor_historical_db.fetchall() + except Exception as e: + logger.error("Error in step 4.1 of acquisition process " + str(e)) + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + + # destroy mqtt client + if mqc and mqc.is_connected(): + mqc.disconnect() + del mqc + # break the inner while loop + break + + if rows_point_values is None or len(rows_point_values) == 0: + # there is no points + print(" Point value Not Found") + + # sleep 60 seconds and go back to the begin of inner while loop + time.sleep(60) + continue + + point_value_list = list() + for row_point_value in rows_point_values: + point_id = row_point_value[0] + point = point_dict.get(point_id) + data_source_id = point['data_source_id'] + utc_date_time = row_point_value[1].replace(tzinfo=None).isoformat(timespec='seconds') + value = row_point_value[2] + point_value_list.append({'data_source_id': data_source_id, + 'point_id': point_id, + 'object_type': object_type, + 'utc_date_time': utc_date_time, + 'value': value}) + + ############################################################################################################ + # Step 5: Publish point values + ############################################################################################################ + + if len(point_value_list) > 0 and mqtt_connected_flag: + for point_value in point_value_list: + try: + # publish real time value to mqtt broker + topic = config.topic_prefix + str(point_value['point_id']) + print('topic=' + topic) + payload = json.dumps({'data_source_id': point_value['data_source_id'], + 'point_id': point_value['point_id'], + 'object_type': point_value['object_type'], + 'utc_date_time': point_value['utc_date_time'], + 'value': point_value['value']}) + print('payload=' + str(payload)) + info = mqc.publish(topic=topic, + payload=payload, + qos=config.qos, + retain=True) + except Exception as e: + logger.error("Error in step 5 of acquisition process: " + str(e)) + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + + # destroy mqtt client + if mqc and mqc.is_connected(): + mqc.disconnect() + del mqc + + # break the inner while loop + break + + # sleep some seconds + time.sleep(config.interval_in_seconds) + # end of inner while loop + + # end of outermost while loop diff --git a/myems-mqtt-publisher/config.py b/myems-mqtt-publisher/config.py new file mode 100644 index 00000000..260a918e --- /dev/null +++ b/myems-mqtt-publisher/config.py @@ -0,0 +1,31 @@ +myems_system_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_system_db', + 'port': 3306, +} + +myems_historical_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_historical_db', + 'port': 3306, +} + +myems_mqtt_broker = { + 'host': '127.0.0.1', + 'port': 1883, + 'username': 'admin', + 'password': 'Password1', +} + +# The quality of service level to use. +# The value is one of 0, 1 or 2, +qos = 0 + +# The topic prefix that the message should be published on. +topic_prefix = 'myems/point/' + +interval_in_seconds = 60 diff --git a/myems-mqtt-publisher/main.py b/myems-mqtt-publisher/main.py new file mode 100644 index 00000000..a31d6767 --- /dev/null +++ b/myems-mqtt-publisher/main.py @@ -0,0 +1,33 @@ +from multiprocessing import Process +import logging +from logging.handlers import RotatingFileHandler +import acquisition + + +def main(): + """main""" + # create logger + logger = logging.getLogger('myems-mqtt-publisher') + # specifies the lowest-severity log message a logger will handle, + # where debug is the lowest built-in severity level and critical is the highest built-in severity. + # For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL + # messages and will ignore DEBUG messages. + logger.setLevel(logging.ERROR) + # create file handler which logs messages + fh = RotatingFileHandler('myems-mqtt-publisher.log', maxBytes=1024*1024, backupCount=1) + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + # add the handlers to logger + logger.addHandler(fh) + + # create acquisition processes + Process(target=acquisition.process, args=(logger, 'ANALOG_VALUE')).start() + + Process(target=acquisition.process, args=(logger, 'DIGITAL_VALUE')).start() + + Process(target=acquisition.process, args=(logger, 'ENERGY_VALUE')).start() + + +if __name__ == "__main__": + main() diff --git a/myems-mqtt-publisher/myems-mqtt-publisher.service b/myems-mqtt-publisher/myems-mqtt-publisher.service new file mode 100644 index 00000000..b62f5be2 --- /dev/null +++ b/myems-mqtt-publisher/myems-mqtt-publisher.service @@ -0,0 +1,15 @@ +[Unit] +Description=myems-mqtt-publisher daemon +After=network.target + +[Service] +User=root +Group=root +ExecStart=/usr/bin/python3 /myems-mqtt-publisher/main.py +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID +PrivateTmp=true +Restart=always + +[Install] +WantedBy=multi-user.target diff --git a/myems-mqtt-publisher/test.py b/myems-mqtt-publisher/test.py new file mode 100644 index 00000000..9dd128de --- /dev/null +++ b/myems-mqtt-publisher/test.py @@ -0,0 +1,81 @@ +import simplejson as json +import config +import time +from datetime import datetime +import paho.mqtt.client as mqtt +import random +import decimal + + +# global flag indicates the connectivity with the MQTT broker +mqtt_connected_flag = False + + +# the on_connect callback function for MQTT client +# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ +def on_mqtt_connect(client, userdata, flags, rc): + global mqtt_connected_flag + if rc == 0: + mqtt_connected_flag = True # set flag + print("MQTT connected OK") + else: + print("Bad MQTT connection Returned code=", rc) + mqtt_connected_flag = False + + +# the on_disconnect callback function for MQTT client +# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ +def on_mqtt_disconnect(client, userdata, rc=0): + global mqtt_connected_flag + + print("DisConnected, result code "+str(rc)) + mqtt_connected_flag = False + + +######################################################################################################################## +# Test Procedures +# Step 1: Connect the MQTT broker +# Step 2: Publish test topic messages +# Step 3: Run 'mosquitto_sub -h 192.168.0.1 -v -t myems/point/# -u admin -P Password1' to receive test messages +######################################################################################################################## + +def main(): + global mqtt_connected_flag + mqc = None + try: + mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time())) + mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password']) + mqc.on_connect = on_mqtt_connect + mqc.on_disconnect = on_mqtt_disconnect + mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60) + # The loop_start() starts a new thread, that calls the loop method at regular intervals for you. + # It also handles re-connects automatically. + mqc.loop_start() + + except Exception as e: + print("MQTT Client Connection error " + str(e)) + while True: + if mqtt_connected_flag: + try: + # publish real time value to mqtt broker + payload = json.dumps({"data_source_id": 1, + "point_id": 3, + "utc_date_time": datetime.utcnow().isoformat(timespec='seconds'), + "value": decimal.Decimal(random.randrange(0, 10000))}) + print('payload=' + str(payload)) + info = mqc.publish('myems/point/' + str(3), + payload=payload, + qos=0, + retain=True) + except Exception as e: + print("MQTT Publish Error : " + str(e)) + # ignore this exception, does not stop the procedure + pass + time.sleep(1) + else: + print('MQTT Client Connection error') + time.sleep(1) + + +if __name__ == "__main__": + main()