Merge branch 'develop'

pull/12/head
13621160019@163.com 2021-02-19 11:16:06 +08:00
commit f1101fe181
9 changed files with 933 additions and 6 deletions

63
.gitignore vendored
View File

@ -1,8 +1,59 @@
# Logs *.py[cod]
logs
*.log # C extensions
*_build *.c
*_static *.so
*_templates
# Jython
*$py.class
# Packages
*.egg
*.egg-info
dist
build
eggs
parts
var
sdist
develop-eggs
.installed.cfg
lib
lib64
# Installer logs
pip-log.txt
# Unit test / coverage reports
.coverage
.tox
nosetests.xml
htmlcov
*.dat
# Docs
doc/_build
# Translations
*.mo
# Idea
.idea .idea
.vscode .vscode
# System
.DS_Store
# VIM swap files
.*.swp
# VIM temp files
*~
# Log
*.log
*.log.1
logs
*_build
*_static
*_templates

21
myems-modbus-tcp/LICENSE Normal file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2020 MyEMS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

148
myems-modbus-tcp/README.md Normal file
View File

@ -0,0 +1,148 @@
## MyEMS Modbus TCP Service
### Introduction
This service is a component of MyEMS to acquire data from Modbus TCP devices.
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b4a22007133463d99493e1798266829)](https://app.codacy.com/gh/myems/myems-modbus-tcp?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-modbus-tcp&utm_campaign=Badge_Grade_Settings)
[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-modbus-tcp/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-modbus-tcp/?branch=master)
[![Maintainability](https://api.codeclimate.com/v1/badges/704c0410c700d520e15f/maintainability)](https://codeclimate.com/github/myems/myems-modbus-tcp/maintainability)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-modbus-tcp.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-modbus-tcp/alerts/)
### Prerequisites
pyserial
modbus-tk
mysql.connector
### Installation
Download and install MySQL Connector:
```
$ cd ~/tools
$ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz
$ tar xzf mysql-connector-python-8.0.20.tar.gz
$ cd ~/tools/mysql-connector-python-8.0.20
$ sudo python3 setup.py install
```
Download and install modbus-tk
```
$ cd ~/tools
$ git clone https://github.com/pyserial/pyserial.git
$ cd ~/tools/pyserial
$ sudo python3 setup.py install
$ git clone https://github.com/ljean/modbus-tk.git
$ cd ~/tools/modbus-tk
$ sudo python3 setup.py install
```
Install myems-modbus-tcp service
```
$ cd ~
$ git clone https://github.com/myems/myems.git
$ sudo git checkout master (or the latest release tag)
$ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp
$ cd /myems-modbus-tcp
```
Eidt the config
```
$ sudo nano /myems-modbus-tcp/config.py
```
Setup systemd service:
```
$ sudo cp /myems-modbus-tcp/myems-modbus-tcp.service /lib/systemd/system/
$ sudo systemctl enable myems-modbus-tcp.service
$ sudo systemctl start myems-modbus-tcp.service
```
### Add Data Sources and Points in MyEMS Admin
refer to https://github.com/myems/myesm-admin.git
NOTE: If you modified Modbus TCP datasources and points, please restart this service:
```
$ sudo systemctl restart myems-modbus-tcp.service
```
Input Data source protocol:
```
modbus-tcp
```
Data source connection example:
```
{"host":"10.9.67.99","port":502}
```
Point address example:
```
{"slave_id":1, "function_code":3, "offset":0, "number_of_registers":2, "format":"<f", "byte_swap":true}
```
### Address
#### slave_id
The slave ID
#### function_code
01 (0x01) Read Coils
02 (0x02) Read Discrete Inputs
03 (0x03) Read Holding Registers
04 (0x04) Read Input Registers
23 (0x17) Read/Write Multiple registers
#### offset
The starting register address specified in the Request PDU
#### number_of_registers
The number of registers specified in the Request PDU
#### format
Use python3 library struct to format bytes.
Python bytes objects are used to hold the data representing the C struct
and also as format strings (explained below) to describe the layout of data in the C struct.
The optional first format char indicates byte order, size and alignment:
@: native order, size & alignment (default)
=: native order, std. size & alignment
<: little-endian, std. size & alignment
>: big-endian, std. size & alignment
!: same as >
The remaining chars indicate types of args and must match exactly;
these can be preceded by a decimal repeat count:
x: pad byte (no data); c:char; b:signed byte; B:unsigned byte;
?: _Bool (requires C99; if not available, char is used instead)
h:short; H:unsigned short; i:int; I:unsigned int;
l:long; L:unsigned long; f:float; d:double.
Special cases (preceding decimal count indicates length):
s:string (array of char); p: pascal string (with count byte).
Special cases (only available in native format):
n:ssize_t; N:size_t;
P:an integer type that is wide enough to hold a pointer.
Special case (not in native mode unless 'long long' in platform C):
q:long long; Q:unsigned long long
Whitespace between formats is ignored.
#### byte_swap
A boolean indicates whether or not to swap adjacent bytes.
Swap adjacent bytes of 32bits(4bytes) or 64bits(8bytes).
This is not for little-endian and big-endian swapping, and use format for that.
The option is effective when number_of_registers is ether 2(32bits) or 4(64bits),
else it will be ignored.
### References
[1]. http://myems.io
[2]. http://www.modbus.org/tech.php
[3]. https://github.com/ljean/modbus-tk
[4]. https://docs.python.org/3/library/struct.html#format-strings

View File

@ -0,0 +1,421 @@
import json
import mysql.connector
import time
import math
from datetime import datetime
import telnetlib
from modbus_tk import modbus_tcp
import config
from decimal import Decimal
from byte_swap import byte_swap_32_bit, byte_swap_64_bit
########################################################################################################################
# Acquisition Procedures
# Step 1: telnet hosts
# Step 2: Get point list
# Step 3: Read point values from Modbus slaves
# Step 4: Bulk insert point values and update latest values in historical database
########################################################################################################################
def process(logger, data_source_id, host, port):
while True:
# the outermost while loop
################################################################################################################
# Step 1: telnet hosts
################################################################################################################
try:
telnetlib.Telnet(host, port, 10)
print("Succeeded to telnet %s:%s in acquisition process ", host, port)
except Exception as e:
logger.error("Failed to telnet %s:%s in acquisition process: %s ", host, port, str(e))
time.sleep(300)
continue
################################################################################################################
# Step 2: Get point list
################################################################################################################
cnx_system_db = None
cursor_system_db = None
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error in step 2.1 of acquisition process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# sleep and then continue the outermost loop to reload points
time.sleep(60)
continue
try:
query = (" SELECT id, name, object_type, is_trend, ratio, address "
" FROM tbl_points "
" WHERE data_source_id = %s "
" ORDER BY id ")
cursor_system_db.execute(query, (data_source_id, ))
rows_point = cursor_system_db.fetchall()
except Exception as e:
logger.error("Error in step 2.2 of acquisition process: " + str(e))
# sleep several minutes and continue the outer loop to reload points
time.sleep(60)
continue
finally:
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
if rows_point is None or len(rows_point) == 0:
# there is no points for this data source
logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id)
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
time.sleep(60)
continue
# There are points for this data source
point_list = list()
for row_point in rows_point:
point_list.append({"id": row_point[0],
"name": row_point[1],
"object_type": row_point[2],
"is_trend": row_point[3],
"ratio": row_point[4],
"address": row_point[5]})
################################################################################################################
# Step 3: Read point values from Modbus slaves
################################################################################################################
# connect to historical database
cnx_historical_db = None
cursor_historical_db = None
try:
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
cursor_historical_db = cnx_historical_db.cursor()
except Exception as e:
logger.error("Error in step 3.1 of acquisition process " + str(e))
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
# sleep 60 seconds and go back to the begin of outermost while loop to reload points
time.sleep(60)
continue
# connect to the Modbus data source
master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
master.set_timeout(5.0)
print("Ready to connect to %s:%s ", host, port)
# inner loop to read all point values within a configurable period
while True:
is_modbus_tcp_timed_out = False
energy_value_list = list()
analog_value_list = list()
digital_value_list = list()
# foreach point loop
for point in point_list:
try:
address = json.loads(point['address'])
except Exception as e:
logger.error("Error in step 3.2 of acquisition process: \n"
"Invalid point address in JSON " + str(e))
continue
if 'slave_id' not in address.keys() \
or 'function_code' not in address.keys() \
or 'offset' not in address.keys() \
or 'number_of_registers' not in address.keys() \
or 'format' not in address.keys() \
or 'byte_swap' not in address.keys() \
or address['slave_id'] < 1 \
or address['function_code'] not in (1, 2, 3, 4) \
or address['offset'] < 0 \
or address['number_of_registers'] < 0 \
or len(address['format']) < 1 \
or not isinstance(address['byte_swap'], bool):
logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.',
data_source_id, point['id'])
# invalid point is found, and go on the foreach point loop to process next point
continue
# read register value for valid point
try:
result = master.execute(slave=address['slave_id'],
function_code=address['function_code'],
starting_address=address['offset'],
quantity_of_x=address['number_of_registers'],
data_format=address['format'])
except Exception as e:
logger.error(str(e) +
" host:" + host + " port:" + str(port) +
" slave_id:" + str(address['slave_id']) +
" function_code:" + str(address['function_code']) +
" starting_address:" + str(address['offset']) +
" quantity_of_x:" + str(address['number_of_registers']) +
" data_format:" + str(address['format']) +
" byte_swap:" + str(address['byte_swap']))
if 'timed out' in str(e):
is_modbus_tcp_timed_out = True
# timeout error, break the foreach point loop
break
else:
# exception occurred when read register value, go on the foreach point loop
continue
if result is None or not isinstance(result, tuple) or len(result) == 0:
# invalid result, and go on the foreach point loop to process next point
logger.error("Error in step 3.3 of acquisition process: \n"
" invalid result: None "
" for point_id: " + str(point['id']))
continue
if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]):
# invalid result, and go on the foreach point loop to process next point
logger.error(" Error in step 3.4 of acquisition process:\n"
" invalid result: not float and not int or not a number "
" for point_id: " + str(point['id']))
continue
if address['byte_swap']:
if address['number_of_registers'] == 2:
value = byte_swap_32_bit(result[0])
elif address['number_of_registers'] == 4:
value = byte_swap_64_bit(result[0])
else:
value = result[0]
else:
value = result[0]
if point['object_type'] == 'ANALOG_VALUE':
analog_value_list.append({'data_source_id': data_source_id,
'point_id': point['id'],
'is_trend': point['is_trend'],
'value': Decimal(value) * point['ratio']})
elif point['object_type'] == 'ENERGY_VALUE':
energy_value_list.append({'data_source_id': data_source_id,
'point_id': point['id'],
'is_trend': point['is_trend'],
'value': Decimal(value) * point['ratio']})
elif point['object_type'] == 'DIGITAL_VALUE':
digital_value_list.append({'data_source_id': data_source_id,
'point_id': point['id'],
'is_trend': point['is_trend'],
'value': int(value) * int(point['ratio'])})
# end of foreach point loop
if is_modbus_tcp_timed_out:
# Modbus TCP connection timeout error
# destroy the Modbus master
del master
# close the connection to database
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
# break the inner while loop to reconnect the Modbus device
time.sleep(60)
break
############################################################################################################
# Step 4: Bulk insert point values and update latest values in historical database
############################################################################################################
# check the connection to the Historical Database
if not cnx_historical_db.is_connected():
try:
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
cursor_historical_db = cnx_historical_db.cursor()
except Exception as e:
logger.error("Error in step 4.1 of acquisition process: " + str(e))
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
# sleep some seconds
time.sleep(60)
continue
current_datetime_utc = datetime.utcnow()
# bulk insert values into historical database within a period
# update latest values in the meanwhile
if len(analog_value_list) > 0:
add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
" VALUES ")
trend_value_count = 0
for point_value in analog_value_list:
if point_value['is_trend']:
add_values += " (" + str(point_value['point_id']) + ","
add_values += "'" + current_datetime_utc.isoformat() + "',"
add_values += str(point_value['value']) + "), "
trend_value_count += 1
if trend_value_count > 0:
try:
# trim ", " at the end of string and then execute
cursor_historical_db.execute(add_values[:-2])
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.2.1 of acquisition process " + str(e))
# ignore this exception
pass
# update tbl_analog_value_latest
delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
" VALUES ")
latest_value_count = 0
for point_value in analog_value_list:
delete_values += str(point_value['point_id']) + ","
latest_values += " (" + str(point_value['point_id']) + ","
latest_values += "'" + current_datetime_utc.isoformat() + "',"
latest_values += str(point_value['value']) + "), "
latest_value_count += 1
if latest_value_count > 0:
try:
# replace "," at the end of string with ")"
cursor_historical_db.execute(delete_values[:-1] + ")")
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.2.2 of acquisition process " + str(e))
# ignore this exception
pass
try:
# trim ", " at the end of string and then execute
cursor_historical_db.execute(latest_values[:-2])
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.2.3 of acquisition process " + str(e))
# ignore this exception
pass
if len(energy_value_list) > 0:
add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
" VALUES ")
trend_value_count = 0
for point_value in energy_value_list:
if point_value['is_trend']:
add_values += " (" + str(point_value['point_id']) + ","
add_values += "'" + current_datetime_utc.isoformat() + "',"
add_values += str(point_value['value']) + "), "
trend_value_count += 1
if trend_value_count > 0:
try:
# trim ", " at the end of string and then execute
cursor_historical_db.execute(add_values[:-2])
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.3.1 of acquisition process: " + str(e))
# ignore this exception
pass
# update tbl_energy_value_latest
delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
" VALUES ")
latest_value_count = 0
for point_value in energy_value_list:
delete_values += str(point_value['point_id']) + ","
latest_values += " (" + str(point_value['point_id']) + ","
latest_values += "'" + current_datetime_utc.isoformat() + "',"
latest_values += str(point_value['value']) + "), "
latest_value_count += 1
if latest_value_count > 0:
try:
# replace "," at the end of string with ")"
cursor_historical_db.execute(delete_values[:-1] + ")")
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.3.2 of acquisition process " + str(e))
# ignore this exception
pass
try:
# trim ", " at the end of string and then execute
cursor_historical_db.execute(latest_values[:-2])
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.3.3 of acquisition process " + str(e))
# ignore this exception
pass
if len(digital_value_list) > 0:
add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
" VALUES ")
trend_value_count = 0
for point_value in digital_value_list:
if point_value['is_trend']:
add_values += " (" + str(point_value['point_id']) + ","
add_values += "'" + current_datetime_utc.isoformat() + "',"
add_values += str(point_value['value']) + "), "
trend_value_count += 1
if trend_value_count > 0:
try:
# trim ", " at the end of string and then execute
cursor_historical_db.execute(add_values[:-2])
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.4.1 of acquisition process: " + str(e))
# ignore this exception
pass
# update tbl_digital_value_latest
delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
" VALUES ")
latest_value_count = 0
for point_value in digital_value_list:
delete_values += str(point_value['point_id']) + ","
latest_values += " (" + str(point_value['point_id']) + ","
latest_values += "'" + current_datetime_utc.isoformat() + "',"
latest_values += str(point_value['value']) + "), "
latest_value_count += 1
if latest_value_count > 0:
try:
# replace "," at the end of string with ")"
cursor_historical_db.execute(delete_values[:-1] + ")")
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.4.2 of acquisition process " + str(e))
# ignore this exception
pass
try:
# trim ", " at the end of string and then execute
cursor_historical_db.execute(latest_values[:-2])
cnx_historical_db.commit()
except Exception as e:
logger.error("Error in step 4.4.3 of acquisition process " + str(e))
# ignore this exception
pass
# sleep some seconds
time.sleep(config.interval_in_seconds)
# end of inner while loop
# end of outermost while loop

View File

@ -0,0 +1,45 @@
import struct
########################################################################################################################
# Swap adjacent bytes
# This is not big-endian and little-endian swapping.
########################################################################################################################
# swap adjacent bytes of 32bits (4bytes) data,
# abcd => badc
def byte_swap_32_bit(x):
x_type = type(x)
if x_type is float:
x = struct.unpack('>I', struct.pack('>f', x))[0]
a = ((x >> 8) & 0x00FF0000)
b = ((x << 8) & 0xFF000000)
c = ((x >> 8) & 0x000000FF)
d = ((x << 8) & 0x0000FF00)
if x_type is float:
return struct.unpack('>f', struct.pack('>I', b | a | d | c))[0]
else:
return b | a | d | c
# swap adjacent bytes of 64bits (8bytes) data,
# abcdefgh => badcfehg
def byte_swap_64_bit(x):
x_type = type(x)
if x_type is float:
x = struct.unpack('>Q', struct.pack('>d', x))[0]
a = ((x >> 8) & 0x00FF000000000000)
b = ((x << 8) & 0xFF00000000000000)
c = ((x >> 8) & 0x000000FF00000000)
d = ((x << 8) & 0x0000FF0000000000)
e = ((x >> 8) & 0x0000000000FF0000)
f = ((x << 8) & 0x00000000FF000000)
g = ((x >> 8) & 0x00000000000000FF)
h = ((x << 8) & 0x000000000000FF00)
if x_type is float:
return struct.unpack('>d', struct.pack('>Q', b | a | d | c | f | e | h | g))[0]
else:
return b | a | d | c | f | e | h | g

View File

@ -0,0 +1,25 @@
myems_system_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_system_db',
'port': 3306,
}
myems_historical_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_historical_db',
'port': 3306,
}
# Indicates how long the process waits between readings
interval_in_seconds = 180
# Get the gateway ID and token from MyEMS Admin
# This is used for getting data sources associated with the gateway
gateway = {
'id': 1,
'token': 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA'
}

104
myems-modbus-tcp/main.py Normal file
View File

@ -0,0 +1,104 @@
import json
import mysql.connector
import config
from multiprocessing import Process
import time
import logging
from logging.handlers import RotatingFileHandler
import acquisition
def main():
"""main"""
# create logger
logger = logging.getLogger('myems-modbus-tcp')
# specifies the lowest-severity log message a logger will handle,
# where debug is the lowest built-in severity level and critical is the highest built-in severity.
# For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL
# messages and will ignore DEBUG messages.
logger.setLevel(logging.ERROR)
# create file handler which logs messages
fh = RotatingFileHandler('myems-modbus-tcp.log', maxBytes=1024*1024, backupCount=1)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(fh)
# Get Data Sources
while True:
# TODO: This service has to RESTART to reload latest data sources and this should be fixed
cnx_system_db = None
cursor_system_db = None
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error in main process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# sleep several minutes and continue the outer loop to reload points
time.sleep(60)
continue
# Get data sources by gateway and protocol
try:
query = (" SELECT ds.id, ds.name, ds.connection "
" FROM tbl_data_sources ds, tbl_gateways g "
" WHERE ds.protocol = 'modbus-tcp' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s "
" ORDER BY ds.id ")
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],))
rows_data_source = cursor_system_db.fetchall()
except Exception as e:
logger.error("Error in main process " + str(e))
# sleep several minutes and continue the outer loop to reload points
time.sleep(60)
continue
finally:
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
if rows_data_source is None or len(rows_data_source) == 0:
logger.error("Data Source Not Found, Wait for minutes to retry.")
# wait for a while and retry
time.sleep(60)
continue
else:
# Stop to connect these data sources
break
for row_data_source in rows_data_source:
print("Data Source: ID=%s, Name=%s, Connection=%s " %
(row_data_source[0], row_data_source[1], row_data_source[2]))
if row_data_source[2] is None or len(row_data_source[2]) == 0:
logger.error("Data Source Connection Not Found.")
continue
try:
server = json.loads(row_data_source[2])
except Exception as e:
logger.error("Data Source Connection JSON error " + str(e))
continue
if 'host' not in server.keys() \
or 'port' not in server.keys() \
or server['host'] is None \
or server['port'] is None \
or len(server['host']) == 0 \
or not isinstance(server['port'], int) \
or server['port'] < 1:
logger.error("Data Source Connection Invalid.")
continue
# fork worker process for each data source
# todo: how to restart the process if the process terminated unexpectedly
Process(target=acquisition.process, args=(logger, row_data_source[0], server['host'], server['port'])).start()
if __name__ == "__main__":
main()

View File

@ -0,0 +1,15 @@
[Unit]
Description=myems-modbus-tcp daemon
After=network.target
[Service]
User=root
Group=root
ExecStart=/usr/bin/python3 /myems-modbus-tcp/main.py
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s TERM $MAINPID
PrivateTmp=true
Restart=always
[Install]
WantedBy=multi-user.target

97
myems-modbus-tcp/test.py Normal file
View File

@ -0,0 +1,97 @@
import sys
from modbus_tk import modbus_tcp
import telnetlib
import byte_swap
def main():
if len(sys.argv) > 1:
host = sys.argv[1]
else:
print('Usage: python3 test.py HOST_IP_ADDR ')
return
port = 502
try:
telnetlib.Telnet(host, port, 10)
print("Succeeded to telnet %s:%s ", host, port)
except Exception as e:
print("Failed to telnet %s:%s : %s ", host, port, str(e))
return
"""
Functions to convert between Python values and C structs.
Python bytes objects are used to hold the data representing the C struct
and also as format strings (explained below) to describe the layout of data
in the C struct.
The optional first format char indicates byte order, size and alignment:
@: native order, size & alignment (default)
=: native order, std. size & alignment
<: little-endian, std. size & alignment
>: big-endian, std. size & alignment
!: same as >
The remaining chars indicate types of args and must match exactly;
these can be preceded by a decimal repeat count:
x: pad byte (no data); c:char; b:signed byte; B:unsigned byte;
?: _Bool (requires C99; if not available, char is used instead)
h:short; H:unsigned short; i:int; I:unsigned int;
l:long; L:unsigned long; f:float; d:double.
Special cases (preceding decimal count indicates length):
s:string (array of char); p: pascal string (with count byte).
Special cases (only available in native format):
n:ssize_t; N:size_t;
P:an integer type that is wide enough to hold a pointer.
Special case (not in native mode unless 'long long' in platform C):
q:long long; Q:unsigned long long
Whitespace between formats is ignored.
The variable struct.error is an exception raised on errors.
"""
try:
master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0)
master.set_timeout(5.0)
print("Connected to %s:%s ", host, port)
print("read registers...")
result = master.execute(slave=1, function_code=3, starting_address=6401, quantity_of_x=2, data_format='<l')
print("51AL1-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
result = master.execute(slave=1, function_code=3, starting_address=6403, quantity_of_x=2, data_format='<l')
print("51AL2-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
result = master.execute(slave=1, function_code=3, starting_address=6405, quantity_of_x=2, data_format='<l')
print("51AL3-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
result = master.execute(slave=1, function_code=3, starting_address=6407, quantity_of_x=2, data_format='<l')
print("51AL4-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
result = master.execute(slave=1, function_code=3, starting_address=6409, quantity_of_x=2, data_format='<l')
print("51AL5-1-KWHimp = " + str(byte_swap.byte_swap_32_bit(result[0])))
# result = master.execute(slave=1, function_code=3, starting_address=11, quantity_of_x=2, data_format='>f')
# print("Volatage Vc-a = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=13, quantity_of_x=2, data_format='>f')
# print("Current a = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=15, quantity_of_x=2, data_format='>f')
# print("Current b = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=17, quantity_of_x=2, data_format='>f')
# print("Current c = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=19, quantity_of_x=2, data_format='>f')
# print("Active Power a = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=25, quantity_of_x=2, data_format='>f')
# print("Active Power b = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=27, quantity_of_x=2, data_format='>f')
# print("Active Power c = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=29, quantity_of_x=2, data_format='>f')
# print("Total Active Power = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=65, quantity_of_x=2, data_format='>f')
# print("Total Power Factor = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=71, quantity_of_x=2, data_format='>f')
# print("Amplitude Unbalance - Volatage = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=73, quantity_of_x=2, data_format='>f')
# print("Amplitude Unbalance - Current = " + str(result))
# result = master.execute(slave=1, function_code=3, starting_address=801, quantity_of_x=4, data_format='>d')
# print("Active Energy Import Tariff 1 = " + str(result))
master.close()
except Exception as e:
print(str(e))
if __name__ == "__main__":
main()