merged myems-modbus-tcp
parent
ff1818d239
commit
fb90e99eb7
|
@ -1,8 +1,59 @@
|
|||
# Logs
|
||||
logs
|
||||
*.log
|
||||
*_build
|
||||
*_static
|
||||
*_templates
|
||||
*.py[cod]
|
||||
|
||||
# C extensions
|
||||
*.c
|
||||
*.so
|
||||
|
||||
# Jython
|
||||
*$py.class
|
||||
|
||||
# Packages
|
||||
*.egg
|
||||
*.egg-info
|
||||
dist
|
||||
build
|
||||
eggs
|
||||
parts
|
||||
var
|
||||
sdist
|
||||
develop-eggs
|
||||
.installed.cfg
|
||||
lib
|
||||
lib64
|
||||
|
||||
# Installer logs
|
||||
pip-log.txt
|
||||
|
||||
# Unit test / coverage reports
|
||||
.coverage
|
||||
.tox
|
||||
nosetests.xml
|
||||
htmlcov
|
||||
*.dat
|
||||
|
||||
# Docs
|
||||
doc/_build
|
||||
|
||||
# Translations
|
||||
*.mo
|
||||
|
||||
# Idea
|
||||
.idea
|
||||
.vscode
|
||||
|
||||
# System
|
||||
.DS_Store
|
||||
|
||||
# VIM swap files
|
||||
.*.swp
|
||||
|
||||
# VIM temp files
|
||||
*~
|
||||
|
||||
# Log
|
||||
*.log
|
||||
*.log.1
|
||||
logs
|
||||
*_build
|
||||
*_static
|
||||
*_templates
|
|
@ -0,0 +1,21 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2020 MyEMS
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,148 @@
|
|||
## MyEMS Modbus TCP Service
|
||||
|
||||
### Introduction
|
||||
This service is a component of MyEMS to acquire data from Modbus TCP devices.
|
||||
|
||||
[](https://app.codacy.com/gh/myems/myems-modbus-tcp?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-modbus-tcp&utm_campaign=Badge_Grade_Settings)
|
||||
[](https://scrutinizer-ci.com/g/myems/myems-modbus-tcp/?branch=master)
|
||||
[](https://codeclimate.com/github/myems/myems-modbus-tcp/maintainability)
|
||||
[](https://lgtm.com/projects/g/myems/myems-modbus-tcp/alerts/)
|
||||
|
||||
|
||||
### Prerequisites
|
||||
pyserial
|
||||
|
||||
modbus-tk
|
||||
|
||||
mysql.connector
|
||||
|
||||
### Installation
|
||||
|
||||
Download and install MySQL Connector:
|
||||
```
|
||||
$ cd ~/tools
|
||||
$ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz
|
||||
$ tar xzf mysql-connector-python-8.0.20.tar.gz
|
||||
$ cd ~/tools/mysql-connector-python-8.0.20
|
||||
$ sudo python3 setup.py install
|
||||
```
|
||||
|
||||
Download and install modbus-tk
|
||||
```
|
||||
$ cd ~/tools
|
||||
$ git clone https://github.com/pyserial/pyserial.git
|
||||
$ cd ~/tools/pyserial
|
||||
$ sudo python3 setup.py install
|
||||
$ git clone https://github.com/ljean/modbus-tk.git
|
||||
$ cd ~/tools/modbus-tk
|
||||
$ sudo python3 setup.py install
|
||||
|
||||
```
|
||||
|
||||
Install myems-modbus-tcp service
|
||||
```
|
||||
$ cd ~
|
||||
$ git clone https://github.com/myems/myems.git
|
||||
$ sudo git checkout master (or the latest release tag)
|
||||
$ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp
|
||||
$ cd /myems-modbus-tcp
|
||||
```
|
||||
Eidt the config
|
||||
```
|
||||
$ sudo nano /myems-modbus-tcp/config.py
|
||||
```
|
||||
Setup systemd service:
|
||||
```
|
||||
$ sudo cp /myems-modbus-tcp/myems-modbus-tcp.service /lib/systemd/system/
|
||||
$ sudo systemctl enable myems-modbus-tcp.service
|
||||
$ sudo systemctl start myems-modbus-tcp.service
|
||||
```
|
||||
|
||||
|
||||
|
||||
### Add Data Sources and Points in MyEMS Admin
|
||||
refer to https://github.com/myems/myesm-admin.git
|
||||
|
||||
NOTE: If you modified Modbus TCP datasources and points, please restart this service:
|
||||
```
|
||||
$ sudo systemctl restart myems-modbus-tcp.service
|
||||
```
|
||||
|
||||
Input Data source protocol:
|
||||
```
|
||||
modbus-tcp
|
||||
```
|
||||
Data source connection example:
|
||||
```
|
||||
{"host":"10.9.67.99","port":502}
|
||||
```
|
||||
|
||||
Point address example:
|
||||
```
|
||||
{"slave_id":1, "function_code":3, "offset":0, "number_of_registers":2, "format":"<f", "byte_swap":true}
|
||||
```
|
||||
|
||||
### Address
|
||||
|
||||
#### slave_id
|
||||
The slave ID
|
||||
|
||||
#### function_code
|
||||
01 (0x01) Read Coils
|
||||
02 (0x02) Read Discrete Inputs
|
||||
03 (0x03) Read Holding Registers
|
||||
04 (0x04) Read Input Registers
|
||||
23 (0x17) Read/Write Multiple registers
|
||||
|
||||
#### offset
|
||||
The starting register address specified in the Request PDU
|
||||
|
||||
#### number_of_registers
|
||||
The number of registers specified in the Request PDU
|
||||
|
||||
#### format
|
||||
Use python3 library struct to format bytes.
|
||||
Python bytes objects are used to hold the data representing the C struct
|
||||
and also as format strings (explained below) to describe the layout of data in the C struct.
|
||||
|
||||
The optional first format char indicates byte order, size and alignment:
|
||||
@: native order, size & alignment (default)
|
||||
=: native order, std. size & alignment
|
||||
<: little-endian, std. size & alignment
|
||||
>: big-endian, std. size & alignment
|
||||
!: same as >
|
||||
|
||||
The remaining chars indicate types of args and must match exactly;
|
||||
these can be preceded by a decimal repeat count:
|
||||
x: pad byte (no data); c:char; b:signed byte; B:unsigned byte;
|
||||
?: _Bool (requires C99; if not available, char is used instead)
|
||||
h:short; H:unsigned short; i:int; I:unsigned int;
|
||||
l:long; L:unsigned long; f:float; d:double.
|
||||
|
||||
Special cases (preceding decimal count indicates length):
|
||||
s:string (array of char); p: pascal string (with count byte).
|
||||
Special cases (only available in native format):
|
||||
n:ssize_t; N:size_t;
|
||||
P:an integer type that is wide enough to hold a pointer.
|
||||
|
||||
Special case (not in native mode unless 'long long' in platform C):
|
||||
q:long long; Q:unsigned long long
|
||||
|
||||
Whitespace between formats is ignored.
|
||||
|
||||
#### byte_swap
|
||||
A boolean indicates whether or not to swap adjacent bytes.
|
||||
Swap adjacent bytes of 32bits(4bytes) or 64bits(8bytes).
|
||||
This is not for little-endian and big-endian swapping, and use format for that.
|
||||
The option is effective when number_of_registers is ether 2(32bits) or 4(64bits),
|
||||
else it will be ignored.
|
||||
|
||||
|
||||
### References
|
||||
[1]. http://myems.io
|
||||
|
||||
[2]. http://www.modbus.org/tech.php
|
||||
|
||||
[3]. https://github.com/ljean/modbus-tk
|
||||
|
||||
[4]. https://docs.python.org/3/library/struct.html#format-strings
|
|
@ -0,0 +1,421 @@
|
|||
import json
|
||||
import mysql.connector
|
||||
import time
|
||||
import math
|
||||
from datetime import datetime
|
||||
import telnetlib
|
||||
from modbus_tk import modbus_tcp
|
||||
import config
|
||||
from decimal import Decimal
|
||||
from byte_swap import byte_swap_32_bit, byte_swap_64_bit
|
||||
|
||||
|
||||
########################################################################################################################
|
||||
# Acquisition Procedures
|
||||
# Step 1: telnet hosts
|
||||
# Step 2: Get point list
|
||||
# Step 3: Read point values from Modbus slaves
|
||||
# Step 4: Bulk insert point values and update latest values in historical database
|
||||
########################################################################################################################
|
||||
|
||||
|
||||
def process(logger, data_source_id, host, port):
|
||||
|
||||
while True:
|
||||
# the outermost while loop
|
||||
|
||||
################################################################################################################
|
||||
# Step 1: telnet hosts
|
||||
################################################################################################################
|
||||
try:
|
||||
telnetlib.Telnet(host, port, 10)
|
||||
print("Succeeded to telnet %s:%s in acquisition process ", host, port)
|
||||
except Exception as e:
|
||||
logger.error("Failed to telnet %s:%s in acquisition process: %s ", host, port, str(e))
|
||||
time.sleep(300)
|
||||
continue
|
||||
|
||||
################################################################################################################
|
||||
# Step 2: Get point list
|
||||
################################################################################################################
|
||||
cnx_system_db = None
|
||||
cursor_system_db = None
|
||||
try:
|
||||
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||
cursor_system_db = cnx_system_db.cursor()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 2.1 of acquisition process " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
# sleep and then continue the outermost loop to reload points
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
try:
|
||||
query = (" SELECT id, name, object_type, is_trend, ratio, address "
|
||||
" FROM tbl_points "
|
||||
" WHERE data_source_id = %s "
|
||||
" ORDER BY id ")
|
||||
cursor_system_db.execute(query, (data_source_id, ))
|
||||
rows_point = cursor_system_db.fetchall()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 2.2 of acquisition process: " + str(e))
|
||||
# sleep several minutes and continue the outer loop to reload points
|
||||
time.sleep(60)
|
||||
continue
|
||||
finally:
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
|
||||
if rows_point is None or len(rows_point) == 0:
|
||||
# there is no points for this data source
|
||||
logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id)
|
||||
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
# There are points for this data source
|
||||
point_list = list()
|
||||
for row_point in rows_point:
|
||||
point_list.append({"id": row_point[0],
|
||||
"name": row_point[1],
|
||||
"object_type": row_point[2],
|
||||
"is_trend": row_point[3],
|
||||
"ratio": row_point[4],
|
||||
"address": row_point[5]})
|
||||
|
||||
################################################################################################################
|
||||
# Step 3: Read point values from Modbus slaves
|
||||
################################################################################################################
|
||||
# connect to historical database
|
||||
cnx_historical_db = None
|
||||
cursor_historical_db = None
|
||||
try:
|
||||
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
|
||||
cursor_historical_db = cnx_historical_db.cursor()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 3.1 of acquisition process " + str(e))
|
||||
if cursor_historical_db:
|
||||
cursor_historical_db.close()
|
||||
if cnx_historical_db:
|
||||
cnx_historical_db.close()
|
||||
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
# connect to the Modbus data source
|
||||
master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
|
||||
master.set_timeout(5.0)
|
||||
print("Ready to connect to %s:%s ", host, port)
|
||||
|
||||
# inner loop to read all point values within a configurable period
|
||||
while True:
|
||||
is_modbus_tcp_timed_out = False
|
||||
energy_value_list = list()
|
||||
analog_value_list = list()
|
||||
digital_value_list = list()
|
||||
|
||||
# foreach point loop
|
||||
for point in point_list:
|
||||
try:
|
||||
address = json.loads(point['address'])
|
||||
except Exception as e:
|
||||
logger.error("Error in step 3.2 of acquisition process: \n"
|
||||
"Invalid point address in JSON " + str(e))
|
||||
continue
|
||||
|
||||
if 'slave_id' not in address.keys() \
|
||||
or 'function_code' not in address.keys() \
|
||||
or 'offset' not in address.keys() \
|
||||
or 'number_of_registers' not in address.keys() \
|
||||
or 'format' not in address.keys() \
|
||||
or 'byte_swap' not in address.keys() \
|
||||
or address['slave_id'] < 1 \
|
||||
or address['function_code'] not in (1, 2, 3, 4) \
|
||||
or address['offset'] < 0 \
|
||||
or address['number_of_registers'] < 0 \
|
||||
or len(address['format']) < 1 \
|
||||
or not isinstance(address['byte_swap'], bool):
|
||||
|
||||
logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.',
|
||||
data_source_id, point['id'])
|
||||
# invalid point is found, and go on the foreach point loop to process next point
|
||||
continue
|
||||
|
||||
# read register value for valid point
|
||||
try:
|
||||
result = master.execute(slave=address['slave_id'],
|
||||
function_code=address['function_code'],
|
||||
starting_address=address['offset'],
|
||||
quantity_of_x=address['number_of_registers'],
|
||||
data_format=address['format'])
|
||||
except Exception as e:
|
||||
logger.error(str(e) +
|
||||
" host:" + host + " port:" + str(port) +
|
||||
" slave_id:" + str(address['slave_id']) +
|
||||
" function_code:" + str(address['function_code']) +
|
||||
" starting_address:" + str(address['offset']) +
|
||||
" quantity_of_x:" + str(address['number_of_registers']) +
|
||||
" data_format:" + str(address['format']) +
|
||||
" byte_swap:" + str(address['byte_swap']))
|
||||
|
||||
if 'timed out' in str(e):
|
||||
is_modbus_tcp_timed_out = True
|
||||
# timeout error, break the foreach point loop
|
||||
break
|
||||
else:
|
||||
# exception occurred when read register value, go on the foreach point loop
|
||||
continue
|
||||
|
||||
if result is None or not isinstance(result, tuple) or len(result) == 0:
|
||||
# invalid result, and go on the foreach point loop to process next point
|
||||
logger.error("Error in step 3.3 of acquisition process: \n"
|
||||
" invalid result: None "
|
||||
" for point_id: " + str(point['id']))
|
||||
continue
|
||||
|
||||
if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]):
|
||||
# invalid result, and go on the foreach point loop to process next point
|
||||
logger.error(" Error in step 3.4 of acquisition process:\n"
|
||||
" invalid result: not float and not int or not a number "
|
||||
" for point_id: " + str(point['id']))
|
||||
continue
|
||||
|
||||
if address['byte_swap']:
|
||||
if address['number_of_registers'] == 2:
|
||||
value = byte_swap_32_bit(result[0])
|
||||
elif address['number_of_registers'] == 4:
|
||||
value = byte_swap_64_bit(result[0])
|
||||
else:
|
||||
value = result[0]
|
||||
else:
|
||||
value = result[0]
|
||||
|
||||
if point['object_type'] == 'ANALOG_VALUE':
|
||||
analog_value_list.append({'data_source_id': data_source_id,
|
||||
'point_id': point['id'],
|
||||
'is_trend': point['is_trend'],
|
||||
'value': Decimal(value) * point['ratio']})
|
||||
elif point['object_type'] == 'ENERGY_VALUE':
|
||||
energy_value_list.append({'data_source_id': data_source_id,
|
||||
'point_id': point['id'],
|
||||
'is_trend': point['is_trend'],
|
||||
'value': Decimal(value) * point['ratio']})
|
||||
elif point['object_type'] == 'DIGITAL_VALUE':
|
||||
digital_value_list.append({'data_source_id': data_source_id,
|
||||
'point_id': point['id'],
|
||||
'is_trend': point['is_trend'],
|
||||
'value': int(value) * int(point['ratio'])})
|
||||
|
||||
# end of foreach point loop
|
||||
|
||||
if is_modbus_tcp_timed_out:
|
||||
# Modbus TCP connection timeout error
|
||||
|
||||
# destroy the Modbus master
|
||||
del master
|
||||
|
||||
# close the connection to database
|
||||
if cursor_historical_db:
|
||||
cursor_historical_db.close()
|
||||
if cnx_historical_db:
|
||||
cnx_historical_db.close()
|
||||
|
||||
# break the inner while loop to reconnect the Modbus device
|
||||
time.sleep(60)
|
||||
break
|
||||
|
||||
############################################################################################################
|
||||
# Step 4: Bulk insert point values and update latest values in historical database
|
||||
############################################################################################################
|
||||
# check the connection to the Historical Database
|
||||
if not cnx_historical_db.is_connected():
|
||||
try:
|
||||
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
|
||||
cursor_historical_db = cnx_historical_db.cursor()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.1 of acquisition process: " + str(e))
|
||||
if cursor_historical_db:
|
||||
cursor_historical_db.close()
|
||||
if cnx_historical_db:
|
||||
cnx_historical_db.close()
|
||||
# sleep some seconds
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
current_datetime_utc = datetime.utcnow()
|
||||
# bulk insert values into historical database within a period
|
||||
# update latest values in the meanwhile
|
||||
if len(analog_value_list) > 0:
|
||||
add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
|
||||
" VALUES ")
|
||||
trend_value_count = 0
|
||||
|
||||
for point_value in analog_value_list:
|
||||
if point_value['is_trend']:
|
||||
add_values += " (" + str(point_value['point_id']) + ","
|
||||
add_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||
add_values += str(point_value['value']) + "), "
|
||||
trend_value_count += 1
|
||||
|
||||
if trend_value_count > 0:
|
||||
try:
|
||||
# trim ", " at the end of string and then execute
|
||||
cursor_historical_db.execute(add_values[:-2])
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.2.1 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
# update tbl_analog_value_latest
|
||||
delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
|
||||
latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
|
||||
" VALUES ")
|
||||
latest_value_count = 0
|
||||
|
||||
for point_value in analog_value_list:
|
||||
delete_values += str(point_value['point_id']) + ","
|
||||
latest_values += " (" + str(point_value['point_id']) + ","
|
||||
latest_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||
latest_values += str(point_value['value']) + "), "
|
||||
latest_value_count += 1
|
||||
|
||||
if latest_value_count > 0:
|
||||
try:
|
||||
# replace "," at the end of string with ")"
|
||||
cursor_historical_db.execute(delete_values[:-1] + ")")
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.2.2 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
try:
|
||||
# trim ", " at the end of string and then execute
|
||||
cursor_historical_db.execute(latest_values[:-2])
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.2.3 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
if len(energy_value_list) > 0:
|
||||
add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
|
||||
" VALUES ")
|
||||
trend_value_count = 0
|
||||
|
||||
for point_value in energy_value_list:
|
||||
if point_value['is_trend']:
|
||||
add_values += " (" + str(point_value['point_id']) + ","
|
||||
add_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||
add_values += str(point_value['value']) + "), "
|
||||
trend_value_count += 1
|
||||
|
||||
if trend_value_count > 0:
|
||||
try:
|
||||
# trim ", " at the end of string and then execute
|
||||
cursor_historical_db.execute(add_values[:-2])
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.3.1 of acquisition process: " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
# update tbl_energy_value_latest
|
||||
delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
|
||||
latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
|
||||
" VALUES ")
|
||||
|
||||
latest_value_count = 0
|
||||
for point_value in energy_value_list:
|
||||
delete_values += str(point_value['point_id']) + ","
|
||||
latest_values += " (" + str(point_value['point_id']) + ","
|
||||
latest_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||
latest_values += str(point_value['value']) + "), "
|
||||
latest_value_count += 1
|
||||
|
||||
if latest_value_count > 0:
|
||||
try:
|
||||
# replace "," at the end of string with ")"
|
||||
cursor_historical_db.execute(delete_values[:-1] + ")")
|
||||
cnx_historical_db.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.3.2 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
try:
|
||||
# trim ", " at the end of string and then execute
|
||||
cursor_historical_db.execute(latest_values[:-2])
|
||||
cnx_historical_db.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.3.3 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
if len(digital_value_list) > 0:
|
||||
add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
|
||||
" VALUES ")
|
||||
trend_value_count = 0
|
||||
|
||||
for point_value in digital_value_list:
|
||||
if point_value['is_trend']:
|
||||
add_values += " (" + str(point_value['point_id']) + ","
|
||||
add_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||
add_values += str(point_value['value']) + "), "
|
||||
trend_value_count += 1
|
||||
|
||||
if trend_value_count > 0:
|
||||
try:
|
||||
# trim ", " at the end of string and then execute
|
||||
cursor_historical_db.execute(add_values[:-2])
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.4.1 of acquisition process: " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
# update tbl_digital_value_latest
|
||||
delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
|
||||
latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
|
||||
" VALUES ")
|
||||
latest_value_count = 0
|
||||
for point_value in digital_value_list:
|
||||
delete_values += str(point_value['point_id']) + ","
|
||||
latest_values += " (" + str(point_value['point_id']) + ","
|
||||
latest_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||
latest_values += str(point_value['value']) + "), "
|
||||
latest_value_count += 1
|
||||
|
||||
if latest_value_count > 0:
|
||||
try:
|
||||
# replace "," at the end of string with ")"
|
||||
cursor_historical_db.execute(delete_values[:-1] + ")")
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.4.2 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
try:
|
||||
# trim ", " at the end of string and then execute
|
||||
cursor_historical_db.execute(latest_values[:-2])
|
||||
cnx_historical_db.commit()
|
||||
except Exception as e:
|
||||
logger.error("Error in step 4.4.3 of acquisition process " + str(e))
|
||||
# ignore this exception
|
||||
pass
|
||||
|
||||
# sleep some seconds
|
||||
time.sleep(config.interval_in_seconds)
|
||||
|
||||
# end of inner while loop
|
||||
|
||||
# end of outermost while loop
|
|
@ -0,0 +1,45 @@
|
|||
import struct
|
||||
########################################################################################################################
|
||||
# Swap adjacent bytes
|
||||
# This is not big-endian and little-endian swapping.
|
||||
########################################################################################################################
|
||||
|
||||
|
||||
# swap adjacent bytes of 32bits (4bytes) data,
|
||||
# abcd => badc
|
||||
def byte_swap_32_bit(x):
|
||||
x_type = type(x)
|
||||
if x_type is float:
|
||||
x = struct.unpack('>I', struct.pack('>f', x))[0]
|
||||
|
||||
a = ((x >> 8) & 0x00FF0000)
|
||||
b = ((x << 8) & 0xFF000000)
|
||||
c = ((x >> 8) & 0x000000FF)
|
||||
d = ((x << 8) & 0x0000FF00)
|
||||
|
||||
if x_type is float:
|
||||
return struct.unpack('>f', struct.pack('>I', b | a | d | c))[0]
|
||||
else:
|
||||
return b | a | d | c
|
||||
|
||||
|
||||
# swap adjacent bytes of 64bits (8bytes) data,
|
||||
# abcdefgh => badcfehg
|
||||
def byte_swap_64_bit(x):
|
||||
x_type = type(x)
|
||||
if x_type is float:
|
||||
x = struct.unpack('>Q', struct.pack('>d', x))[0]
|
||||
|
||||
a = ((x >> 8) & 0x00FF000000000000)
|
||||
b = ((x << 8) & 0xFF00000000000000)
|
||||
c = ((x >> 8) & 0x000000FF00000000)
|
||||
d = ((x << 8) & 0x0000FF0000000000)
|
||||
e = ((x >> 8) & 0x0000000000FF0000)
|
||||
f = ((x << 8) & 0x00000000FF000000)
|
||||
g = ((x >> 8) & 0x00000000000000FF)
|
||||
h = ((x << 8) & 0x000000000000FF00)
|
||||
|
||||
if x_type is float:
|
||||
return struct.unpack('>d', struct.pack('>Q', b | a | d | c | f | e | h | g))[0]
|
||||
else:
|
||||
return b | a | d | c | f | e | h | g
|
|
@ -0,0 +1,25 @@
|
|||
myems_system_db = {
|
||||
'user': 'root',
|
||||
'password': '!MyEMS1',
|
||||
'host': '127.0.0.1',
|
||||
'database': 'myems_system_db',
|
||||
'port': 3306,
|
||||
}
|
||||
|
||||
myems_historical_db = {
|
||||
'user': 'root',
|
||||
'password': '!MyEMS1',
|
||||
'host': '127.0.0.1',
|
||||
'database': 'myems_historical_db',
|
||||
'port': 3306,
|
||||
}
|
||||
|
||||
# Indicates how long the process waits between readings
|
||||
interval_in_seconds = 180
|
||||
|
||||
# Get the gateway ID and token from MyEMS Admin
|
||||
# This is used for getting data sources associated with the gateway
|
||||
gateway = {
|
||||
'id': 1,
|
||||
'token': 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA'
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
import json
|
||||
import mysql.connector
|
||||
import config
|
||||
from multiprocessing import Process
|
||||
import time
|
||||
import logging
|
||||
from logging.handlers import RotatingFileHandler
|
||||
import acquisition
|
||||
|
||||
|
||||
def main():
|
||||
"""main"""
|
||||
# create logger
|
||||
logger = logging.getLogger('myems-modbus-tcp')
|
||||
# specifies the lowest-severity log message a logger will handle,
|
||||
# where debug is the lowest built-in severity level and critical is the highest built-in severity.
|
||||
# For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL
|
||||
# messages and will ignore DEBUG messages.
|
||||
logger.setLevel(logging.ERROR)
|
||||
# create file handler which logs messages
|
||||
fh = RotatingFileHandler('myems-modbus-tcp.log', maxBytes=1024*1024, backupCount=1)
|
||||
# create formatter and add it to the handlers
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
fh.setFormatter(formatter)
|
||||
# add the handlers to logger
|
||||
logger.addHandler(fh)
|
||||
|
||||
# Get Data Sources
|
||||
while True:
|
||||
# TODO: This service has to RESTART to reload latest data sources and this should be fixed
|
||||
cnx_system_db = None
|
||||
cursor_system_db = None
|
||||
try:
|
||||
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||
cursor_system_db = cnx_system_db.cursor()
|
||||
except Exception as e:
|
||||
logger.error("Error in main process " + str(e))
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
# sleep several minutes and continue the outer loop to reload points
|
||||
time.sleep(60)
|
||||
continue
|
||||
|
||||
# Get data sources by gateway and protocol
|
||||
try:
|
||||
query = (" SELECT ds.id, ds.name, ds.connection "
|
||||
" FROM tbl_data_sources ds, tbl_gateways g "
|
||||
" WHERE ds.protocol = 'modbus-tcp' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s "
|
||||
" ORDER BY ds.id ")
|
||||
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],))
|
||||
rows_data_source = cursor_system_db.fetchall()
|
||||
except Exception as e:
|
||||
logger.error("Error in main process " + str(e))
|
||||
# sleep several minutes and continue the outer loop to reload points
|
||||
time.sleep(60)
|
||||
continue
|
||||
finally:
|
||||
if cursor_system_db:
|
||||
cursor_system_db.close()
|
||||
if cnx_system_db:
|
||||
cnx_system_db.close()
|
||||
|
||||
if rows_data_source is None or len(rows_data_source) == 0:
|
||||
logger.error("Data Source Not Found, Wait for minutes to retry.")
|
||||
# wait for a while and retry
|
||||
time.sleep(60)
|
||||
continue
|
||||
else:
|
||||
# Stop to connect these data sources
|
||||
break
|
||||
|
||||
for row_data_source in rows_data_source:
|
||||
print("Data Source: ID=%s, Name=%s, Connection=%s " %
|
||||
(row_data_source[0], row_data_source[1], row_data_source[2]))
|
||||
|
||||
if row_data_source[2] is None or len(row_data_source[2]) == 0:
|
||||
logger.error("Data Source Connection Not Found.")
|
||||
continue
|
||||
|
||||
try:
|
||||
server = json.loads(row_data_source[2])
|
||||
except Exception as e:
|
||||
logger.error("Data Source Connection JSON error " + str(e))
|
||||
continue
|
||||
|
||||
if 'host' not in server.keys() \
|
||||
or 'port' not in server.keys() \
|
||||
or server['host'] is None \
|
||||
or server['port'] is None \
|
||||
or len(server['host']) == 0 \
|
||||
or not isinstance(server['port'], int) \
|
||||
or server['port'] < 1:
|
||||
logger.error("Data Source Connection Invalid.")
|
||||
continue
|
||||
|
||||
# fork worker process for each data source
|
||||
# todo: how to restart the process if the process terminated unexpectedly
|
||||
Process(target=acquisition.process, args=(logger, row_data_source[0], server['host'], server['port'])).start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
|
@ -0,0 +1,15 @@
|
|||
[Unit]
|
||||
Description=myems-modbus-tcp daemon
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
User=root
|
||||
Group=root
|
||||
ExecStart=/usr/bin/python3 /myems-modbus-tcp/main.py
|
||||
ExecReload=/bin/kill -s HUP $MAINPID
|
||||
ExecStop=/bin/kill -s TERM $MAINPID
|
||||
PrivateTmp=true
|
||||
Restart=always
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
|
@ -0,0 +1,97 @@
|
|||
import sys
|
||||
from modbus_tk import modbus_tcp
|
||||
import telnetlib
|
||||
import byte_swap
|
||||
|
||||
|
||||
def main():
|
||||
if len(sys.argv) > 1:
|
||||
host = sys.argv[1]
|
||||
else:
|
||||
print('Usage: python3 test.py HOST_IP_ADDR ')
|
||||
return
|
||||
|
||||
port = 502
|
||||
try:
|
||||
telnetlib.Telnet(host, port, 10)
|
||||
print("Succeeded to telnet %s:%s ", host, port)
|
||||
except Exception as e:
|
||||
print("Failed to telnet %s:%s : %s ", host, port, str(e))
|
||||
return
|
||||
|
||||
"""
|
||||
Functions to convert between Python values and C structs.
|
||||
Python bytes objects are used to hold the data representing the C struct
|
||||
and also as format strings (explained below) to describe the layout of data
|
||||
in the C struct.
|
||||
|
||||
The optional first format char indicates byte order, size and alignment:
|
||||
@: native order, size & alignment (default)
|
||||
=: native order, std. size & alignment
|
||||
<: little-endian, std. size & alignment
|
||||
>: big-endian, std. size & alignment
|
||||
!: same as >
|
||||
|
||||
The remaining chars indicate types of args and must match exactly;
|
||||
these can be preceded by a decimal repeat count:
|
||||
x: pad byte (no data); c:char; b:signed byte; B:unsigned byte;
|
||||
?: _Bool (requires C99; if not available, char is used instead)
|
||||
h:short; H:unsigned short; i:int; I:unsigned int;
|
||||
l:long; L:unsigned long; f:float; d:double.
|
||||
Special cases (preceding decimal count indicates length):
|
||||
s:string (array of char); p: pascal string (with count byte).
|
||||
Special cases (only available in native format):
|
||||
n:ssize_t; N:size_t;
|
||||
P:an integer type that is wide enough to hold a pointer.
|
||||
Special case (not in native mode unless 'long long' in platform C):
|
||||
q:long long; Q:unsigned long long
|
||||
Whitespace between formats is ignored.
|
||||
|
||||
The variable struct.error is an exception raised on errors.
|
||||
"""
|
||||
try:
|
||||
master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
|
||||
master.set_timeout(5.0)
|
||||
print("Connected to %s:%s ", host, port)
|
||||
print("read registers...")
|
||||
result = master.execute(slave=1, function_code=3, starting_address=6401, quantity_of_x=2, data_format='<l')
|
||||
print("51AL1-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
|
||||
result = master.execute(slave=1, function_code=3, starting_address=6403, quantity_of_x=2, data_format='<l')
|
||||
print("51AL2-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
|
||||
result = master.execute(slave=1, function_code=3, starting_address=6405, quantity_of_x=2, data_format='<l')
|
||||
print("51AL3-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
|
||||
result = master.execute(slave=1, function_code=3, starting_address=6407, quantity_of_x=2, data_format='<l')
|
||||
print("51AL4-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
|
||||
result = master.execute(slave=1, function_code=3, starting_address=6409, quantity_of_x=2, data_format='<l')
|
||||
print("51AL5-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=11, quantity_of_x=2, data_format='>f')
|
||||
# print("Volatage Vc-a = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=13, quantity_of_x=2, data_format='>f')
|
||||
# print("Current a = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=15, quantity_of_x=2, data_format='>f')
|
||||
# print("Current b = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=17, quantity_of_x=2, data_format='>f')
|
||||
# print("Current c = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=19, quantity_of_x=2, data_format='>f')
|
||||
# print("Active Power a = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=25, quantity_of_x=2, data_format='>f')
|
||||
# print("Active Power b = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=27, quantity_of_x=2, data_format='>f')
|
||||
# print("Active Power c = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=29, quantity_of_x=2, data_format='>f')
|
||||
# print("Total Active Power = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=65, quantity_of_x=2, data_format='>f')
|
||||
# print("Total Power Factor = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=71, quantity_of_x=2, data_format='>f')
|
||||
# print("Amplitude Unbalance - Volatage = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=73, quantity_of_x=2, data_format='>f')
|
||||
# print("Amplitude Unbalance - Current = " + str(result))
|
||||
# result = master.execute(slave=1, function_code=3, starting_address=801, quantity_of_x=4, data_format='>d')
|
||||
# print("Active Energy Import Tariff 1 = " + str(result))
|
||||
master.close()
|
||||
except Exception as e:
|
||||
print(str(e))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Reference in New Issue