merged myems-mqtt-publisher

pull/1/head
13621160019@163.com 2021-02-19 13:21:05 +08:00
parent cab097fc78
commit 43a942c19d
7 changed files with 516 additions and 0 deletions

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 MyEMS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,95 @@
## MyEMS MQTT Publisher Service
### Introduction
This service is a component of MyEMS to publish data to MQTT broker.
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/eb783b8f80d94fa583dd1ebe953f0e97)](https://app.codacy.com/gh/myems/myems-mqtt-publisher?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-mqtt-publisher&utm_campaign=Badge_Grade)
[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-mqtt-publisher/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-mqtt-publisher/?branch=master)
[![Maintainability](https://api.codeclimate.com/v1/badges/f2cb7c3fb4a7499e9d1d/maintainability)](https://codeclimate.com/github/myems/myems-mqtt-publisher/maintainability)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-mqtt-publisher.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-mqtt-publisher/alerts/)
### Prerequisites
simplejson
paho-mqtt
mysql.connector
### Installation
Download and Install simplejson
```
$ cd ~/tools
$ git clone https://github.com/simplejson/simplejson.git
$ cd simplejson
$ sudo python3 setup.py install
```
Download and install MySQL Connector:
```
$ cd ~/tools
$ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz
$ tar xzf mysql-connector-python-8.0.20.tar.gz
$ cd ~/tools/mysql-connector-python-8.0.20
$ sudo python3 setup.py install
```
Download and install paho-mqtt:
```
$ cd ~/tools
$ git clone https://github.com/eclipse/paho.mqtt.python.git
$ cd ~/tools/paho.mqtt.python
$ sudo python3 setup.py install
```
Install myems-mqtt-publisher service
```
$ cd ~
$ git clone https://github.com/myems/myems.git
$ cd myems
$ sudo git checkout master (or the latest release tag)
$ sudo cp -R ~/myems/myems-mqtt-publisher /myems-mqtt-publisher
```
Eidt the config
```
$ sudo nano /myems-mqtt-publisher/config.py
```
Setup systemd service:
```
$ sudo cp /myems-mqtt-publisher/myems-mqtt-publisher.service /lib/systemd/system/
$ sudo systemctl enable myems-mqtt-publisher.service
$ sudo systemctl start myems-mqtt-publisher.service
```
### Topic
topic_prefix in config and point_id
Example:
```
'myems/point/3'
```
### Payload
data_source_id, the Data Source ID.
point_id, the Point ID.
object_type, the type of data, is one of 'ANALOG_VALUE'(decimal(18, 3)) , 'ENERGY_VALUE'(decimal(18, 3)) and 'DIGITAL_VALUE'(int(11)).
utc_date_time, the date time in utc when data was acquired. The full format looks like 'YYYY-MM-DDTHH:MM:SS'.
value, the data value in Decimal or Integer.
Example:
```
{"data_source_id": 1, "point_id": 3, "object_type": 'ANALOG_VALUE', "utc_date_time": "2020-09-28T03:23:06", "value": Decimal('591960276.000')}
```
### References
[1]. http://myems.io
[2]. https://www.eclipse.org/paho/clients/python/
[3]. https://simplejson.readthedocs.io/

View File

@ -0,0 +1,240 @@
import json
import mysql.connector
import time
import simplejson as json
import paho.mqtt.client as mqtt
import config
# indicates the connectivity with the MQTT broker
mqtt_connected_flag = False
# the on_connect callback function for MQTT client
# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/
def on_mqtt_connect(client, userdata, flags, rc):
global mqtt_connected_flag
if rc == 0:
mqtt_connected_flag = True # set flag
print("MQTT connected OK")
else:
print("Bad MQTT connection Returned code=", rc)
mqtt_connected_flag = False
# the on_disconnect callback function for MQTT client
# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/
def on_mqtt_disconnect(client, userdata, rc=0):
global mqtt_connected_flag
print("DisConnected, result code "+str(rc))
mqtt_connected_flag = False
########################################################################################################################
# Acquisition Procedures
# Step 1: Get point list
# Step 2: Connect to the historical database
# Step 3: Connect to the MQTT broker
# Step 4: Read point values from latest tables in historical database
# Step 5: Publish point values
########################################################################################################################
def process(logger, object_type):
while True:
# the outermost while loop
################################################################################################################
# Step 1: Get point list
################################################################################################################
cnx_system_db = None
cursor_system_db = None
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error in step 1.1 of acquisition process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# sleep and then continue the outermost loop to reload points
time.sleep(60)
continue
try:
if object_type == 'ANALOG_VALUE':
query = (" SELECT id, name, data_source_id "
" FROM tbl_points"
" WHERE object_type = 'ANALOG_VALUE' "
" ORDER BY id ")
elif object_type == 'DIGITAL_VALUE':
query = (" SELECT id, name, data_source_id "
" FROM tbl_points"
" WHERE object_type = 'DIGITAL_VALUE' "
" ORDER BY id ")
elif object_type == 'ENERGY_VALUE':
query = (" SELECT id, name, data_source_id "
" FROM tbl_points"
" WHERE object_type = 'ENERGY_VALUE' "
" ORDER BY id ")
cursor_system_db.execute(query, )
rows_point = cursor_system_db.fetchall()
except Exception as e:
logger.error("Error in step 1.2 of acquisition process: " + str(e))
# sleep several minutes and continue the outer loop to reload points
time.sleep(60)
continue
finally:
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
if rows_point is None or len(rows_point) == 0:
# there is no points
logger.error("Point Not Found, acquisition process terminated ")
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
time.sleep(60)
continue
point_dict = dict()
for row_point in rows_point:
point_dict[row_point[0]] = {'name': row_point[1], 'data_source_id': row_point[2]}
################################################################################################################
# Step 2: Connect to the historical database
################################################################################################################
cnx_historical_db = None
cursor_historical_db = None
try:
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
cursor_historical_db = cnx_historical_db.cursor()
except Exception as e:
logger.error("Error in step 2.1 of acquisition process " + str(e))
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
time.sleep(60)
continue
################################################################################################################
# Step 3: Connect to the MQTT broker
################################################################################################################
mqc = None
try:
mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time()))
mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password'])
mqc.on_connect = on_mqtt_connect
mqc.on_disconnect = on_mqtt_disconnect
mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60)
# The loop_start() starts a new thread, that calls the loop method at regular intervals for you.
# It also handles re-connects automatically.
mqc.loop_start()
except Exception as e:
logger.error("Error in step 3.1 of acquisition process " + str(e))
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
time.sleep(60)
continue
################################################################################################################
# Step 4: Read point values from latest tables in historical database
################################################################################################################
# inner loop to read all point latest values and publish them within a period
while True:
if object_type == 'ANALOG_VALUE':
query = " SELECT point_id, utc_date_time, actual_value" \
" FROM tbl_analog_value_latest WHERE point_id IN ( "
elif object_type == 'DIGITAL_VALUE':
query = " SELECT point_id, utc_date_time, actual_value" \
" FROM tbl_digital_value_latest WHERE point_id IN ( "
elif object_type == 'ENERGY_VALUE':
query = " SELECT point_id, utc_date_time, actual_value" \
" FROM tbl_energy_value_latest WHERE point_id IN ( "
for point_id in point_dict:
query += str(point_id) + ","
try:
# replace "," at the end of string with ")"
cursor_historical_db.execute(query[:-1] + ")")
rows_point_values = cursor_historical_db.fetchall()
except Exception as e:
logger.error("Error in step 4.1 of acquisition process " + str(e))
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
# destroy mqtt client
if mqc and mqc.is_connected():
mqc.disconnect()
del mqc
# break the inner while loop
break
if rows_point_values is None or len(rows_point_values) == 0:
# there is no points
print(" Point value Not Found")
# sleep 60 seconds and go back to the begin of inner while loop
time.sleep(60)
continue
point_value_list = list()
for row_point_value in rows_point_values:
point_id = row_point_value[0]
point = point_dict.get(point_id)
data_source_id = point['data_source_id']
utc_date_time = row_point_value[1].replace(tzinfo=None).isoformat(timespec='seconds')
value = row_point_value[2]
point_value_list.append({'data_source_id': data_source_id,
'point_id': point_id,
'object_type': object_type,
'utc_date_time': utc_date_time,
'value': value})
############################################################################################################
# Step 5: Publish point values
############################################################################################################
if len(point_value_list) > 0 and mqtt_connected_flag:
for point_value in point_value_list:
try:
# publish real time value to mqtt broker
topic = config.topic_prefix + str(point_value['point_id'])
print('topic=' + topic)
payload = json.dumps({'data_source_id': point_value['data_source_id'],
'point_id': point_value['point_id'],
'object_type': point_value['object_type'],
'utc_date_time': point_value['utc_date_time'],
'value': point_value['value']})
print('payload=' + str(payload))
info = mqc.publish(topic=topic,
payload=payload,
qos=config.qos,
retain=True)
except Exception as e:
logger.error("Error in step 5 of acquisition process: " + str(e))
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
# destroy mqtt client
if mqc and mqc.is_connected():
mqc.disconnect()
del mqc
# break the inner while loop
break
# sleep some seconds
time.sleep(config.interval_in_seconds)
# end of inner while loop
# end of outermost while loop

View File

@ -0,0 +1,31 @@
myems_system_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_system_db',
'port': 3306,
}
myems_historical_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_historical_db',
'port': 3306,
}
myems_mqtt_broker = {
'host': '127.0.0.1',
'port': 1883,
'username': 'admin',
'password': 'Password1',
}
# The quality of service level to use.
# The value is one of 0, 1 or 2,
qos = 0
# The topic prefix that the message should be published on.
topic_prefix = 'myems/point/'
interval_in_seconds = 60

View File

@ -0,0 +1,33 @@
from multiprocessing import Process
import logging
from logging.handlers import RotatingFileHandler
import acquisition
def main():
"""main"""
# create logger
logger = logging.getLogger('myems-mqtt-publisher')
# specifies the lowest-severity log message a logger will handle,
# where debug is the lowest built-in severity level and critical is the highest built-in severity.
# For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL
# messages and will ignore DEBUG messages.
logger.setLevel(logging.ERROR)
# create file handler which logs messages
fh = RotatingFileHandler('myems-mqtt-publisher.log', maxBytes=1024*1024, backupCount=1)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(fh)
# create acquisition processes
Process(target=acquisition.process, args=(logger, 'ANALOG_VALUE')).start()
Process(target=acquisition.process, args=(logger, 'DIGITAL_VALUE')).start()
Process(target=acquisition.process, args=(logger, 'ENERGY_VALUE')).start()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,15 @@
[Unit]
Description=myems-mqtt-publisher daemon
After=network.target
[Service]
User=root
Group=root
ExecStart=/usr/bin/python3 /myems-mqtt-publisher/main.py
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s TERM $MAINPID
PrivateTmp=true
Restart=always
[Install]
WantedBy=multi-user.target

View File

@ -0,0 +1,81 @@
import simplejson as json
import config
import time
from datetime import datetime
import paho.mqtt.client as mqtt
import random
import decimal
# global flag indicates the connectivity with the MQTT broker
mqtt_connected_flag = False
# the on_connect callback function for MQTT client
# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/
def on_mqtt_connect(client, userdata, flags, rc):
global mqtt_connected_flag
if rc == 0:
mqtt_connected_flag = True # set flag
print("MQTT connected OK")
else:
print("Bad MQTT connection Returned code=", rc)
mqtt_connected_flag = False
# the on_disconnect callback function for MQTT client
# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/
def on_mqtt_disconnect(client, userdata, rc=0):
global mqtt_connected_flag
print("DisConnected, result code "+str(rc))
mqtt_connected_flag = False
########################################################################################################################
# Test Procedures
# Step 1: Connect the MQTT broker
# Step 2: Publish test topic messages
# Step 3: Run 'mosquitto_sub -h 192.168.0.1 -v -t myems/point/# -u admin -P Password1' to receive test messages
########################################################################################################################
def main():
global mqtt_connected_flag
mqc = None
try:
mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time()))
mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password'])
mqc.on_connect = on_mqtt_connect
mqc.on_disconnect = on_mqtt_disconnect
mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60)
# The loop_start() starts a new thread, that calls the loop method at regular intervals for you.
# It also handles re-connects automatically.
mqc.loop_start()
except Exception as e:
print("MQTT Client Connection error " + str(e))
while True:
if mqtt_connected_flag:
try:
# publish real time value to mqtt broker
payload = json.dumps({"data_source_id": 1,
"point_id": 3,
"utc_date_time": datetime.utcnow().isoformat(timespec='seconds'),
"value": decimal.Decimal(random.randrange(0, 10000))})
print('payload=' + str(payload))
info = mqc.publish('myems/point/' + str(3),
payload=payload,
qos=0,
retain=True)
except Exception as e:
print("MQTT Publish Error : " + str(e))
# ignore this exception, does not stop the procedure
pass
time.sleep(1)
else:
print('MQTT Client Connection error')
time.sleep(1)
if __name__ == "__main__":
main()