diff --git a/.gitignore b/.gitignore index 709037d2..9998bb43 100644 --- a/.gitignore +++ b/.gitignore @@ -1,8 +1,59 @@ -# Logs -logs -*.log -*_build -*_static -*_templates +*.py[cod] + +# C extensions +*.c +*.so + +# Jython +*$py.class + +# Packages +*.egg +*.egg-info +dist +build +eggs +parts +var +sdist +develop-eggs +.installed.cfg +lib +lib64 + +# Installer logs +pip-log.txt + +# Unit test / coverage reports +.coverage +.tox +nosetests.xml +htmlcov +*.dat + +# Docs +doc/_build + +# Translations +*.mo + +# Idea .idea .vscode + +# System +.DS_Store + +# VIM swap files +.*.swp + +# VIM temp files +*~ + +# Log +*.log +*.log.1 +logs +*_build +*_static +*_templates \ No newline at end of file diff --git a/myems-modbus-tcp/LICENSE b/myems-modbus-tcp/LICENSE new file mode 100644 index 00000000..c6cedd28 --- /dev/null +++ b/myems-modbus-tcp/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 MyEMS + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/myems-modbus-tcp/README.md b/myems-modbus-tcp/README.md new file mode 100644 index 00000000..74991bc3 --- /dev/null +++ b/myems-modbus-tcp/README.md @@ -0,0 +1,148 @@ +## MyEMS Modbus TCP Service + +### Introduction +This service is a component of MyEMS to acquire data from Modbus TCP devices. + +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/6b4a22007133463d99493e1798266829)](https://app.codacy.com/gh/myems/myems-modbus-tcp?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-modbus-tcp&utm_campaign=Badge_Grade_Settings) +[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-modbus-tcp/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-modbus-tcp/?branch=master) +[![Maintainability](https://api.codeclimate.com/v1/badges/704c0410c700d520e15f/maintainability)](https://codeclimate.com/github/myems/myems-modbus-tcp/maintainability) +[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-modbus-tcp.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-modbus-tcp/alerts/) + + +### Prerequisites +pyserial + +modbus-tk + +mysql.connector + +### Installation + +Download and install MySQL Connector: +``` + $ cd ~/tools + $ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz + $ tar xzf mysql-connector-python-8.0.20.tar.gz + $ cd ~/tools/mysql-connector-python-8.0.20 + $ sudo python3 setup.py install +``` + +Download and install modbus-tk +``` + $ cd ~/tools + $ git clone https://github.com/pyserial/pyserial.git + $ cd ~/tools/pyserial + $ sudo python3 setup.py install + $ git clone https://github.com/ljean/modbus-tk.git + $ cd ~/tools/modbus-tk + $ sudo python3 setup.py install + +``` + +Install myems-modbus-tcp service +``` + $ cd ~ + $ git clone https://github.com/myems/myems.git + $ sudo git checkout master (or the latest release tag) + $ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp + $ cd /myems-modbus-tcp +``` + Eidt the config +``` + $ sudo nano /myems-modbus-tcp/config.py +``` + Setup systemd service: +``` + $ sudo cp /myems-modbus-tcp/myems-modbus-tcp.service /lib/systemd/system/ + $ sudo systemctl enable myems-modbus-tcp.service + $ sudo systemctl start myems-modbus-tcp.service +``` + + + +### Add Data Sources and Points in MyEMS Admin +refer to https://github.com/myems/myesm-admin.git + +NOTE: If you modified Modbus TCP datasources and points, please restart this service: +``` + $ sudo systemctl restart myems-modbus-tcp.service +``` + +Input Data source protocol: +``` +modbus-tcp +``` +Data source connection example: +``` +{"host":"10.9.67.99","port":502} +``` + +Point address example: +``` +{"slave_id":1, "function_code":3, "offset":0, "number_of_registers":2, "format":": big-endian, std. size & alignment + !: same as > + +The remaining chars indicate types of args and must match exactly; +these can be preceded by a decimal repeat count: + x: pad byte (no data); c:char; b:signed byte; B:unsigned byte; + ?: _Bool (requires C99; if not available, char is used instead) + h:short; H:unsigned short; i:int; I:unsigned int; + l:long; L:unsigned long; f:float; d:double. + +Special cases (preceding decimal count indicates length): + s:string (array of char); p: pascal string (with count byte). +Special cases (only available in native format): + n:ssize_t; N:size_t; + P:an integer type that is wide enough to hold a pointer. + +Special case (not in native mode unless 'long long' in platform C): + q:long long; Q:unsigned long long + +Whitespace between formats is ignored. + +#### byte_swap +A boolean indicates whether or not to swap adjacent bytes. +Swap adjacent bytes of 32bits(4bytes) or 64bits(8bytes). +This is not for little-endian and big-endian swapping, and use format for that. +The option is effective when number_of_registers is ether 2(32bits) or 4(64bits), +else it will be ignored. + + +### References + [1]. http://myems.io + + [2]. http://www.modbus.org/tech.php + + [3]. https://github.com/ljean/modbus-tk + + [4]. https://docs.python.org/3/library/struct.html#format-strings \ No newline at end of file diff --git a/myems-modbus-tcp/acquisition.py b/myems-modbus-tcp/acquisition.py new file mode 100644 index 00000000..7a053c9b --- /dev/null +++ b/myems-modbus-tcp/acquisition.py @@ -0,0 +1,421 @@ +import json +import mysql.connector +import time +import math +from datetime import datetime +import telnetlib +from modbus_tk import modbus_tcp +import config +from decimal import Decimal +from byte_swap import byte_swap_32_bit, byte_swap_64_bit + + +######################################################################################################################## +# Acquisition Procedures +# Step 1: telnet hosts +# Step 2: Get point list +# Step 3: Read point values from Modbus slaves +# Step 4: Bulk insert point values and update latest values in historical database +######################################################################################################################## + + +def process(logger, data_source_id, host, port): + + while True: + # the outermost while loop + + ################################################################################################################ + # Step 1: telnet hosts + ################################################################################################################ + try: + telnetlib.Telnet(host, port, 10) + print("Succeeded to telnet %s:%s in acquisition process ", host, port) + except Exception as e: + logger.error("Failed to telnet %s:%s in acquisition process: %s ", host, port, str(e)) + time.sleep(300) + continue + + ################################################################################################################ + # Step 2: Get point list + ################################################################################################################ + cnx_system_db = None + cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 2.1 of acquisition process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep and then continue the outermost loop to reload points + time.sleep(60) + continue + + try: + query = (" SELECT id, name, object_type, is_trend, ratio, address " + " FROM tbl_points " + " WHERE data_source_id = %s " + " ORDER BY id ") + cursor_system_db.execute(query, (data_source_id, )) + rows_point = cursor_system_db.fetchall() + except Exception as e: + logger.error("Error in step 2.2 of acquisition process: " + str(e)) + # sleep several minutes and continue the outer loop to reload points + time.sleep(60) + continue + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + if rows_point is None or len(rows_point) == 0: + # there is no points for this data source + logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id) + # sleep 60 seconds and go back to the begin of outermost while loop to reload points + time.sleep(60) + continue + + # There are points for this data source + point_list = list() + for row_point in rows_point: + point_list.append({"id": row_point[0], + "name": row_point[1], + "object_type": row_point[2], + "is_trend": row_point[3], + "ratio": row_point[4], + "address": row_point[5]}) + + ################################################################################################################ + # Step 3: Read point values from Modbus slaves + ################################################################################################################ + # connect to historical database + cnx_historical_db = None + cursor_historical_db = None + try: + cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) + cursor_historical_db = cnx_historical_db.cursor() + except Exception as e: + logger.error("Error in step 3.1 of acquisition process " + str(e)) + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + # sleep 60 seconds and go back to the begin of outermost while loop to reload points + time.sleep(60) + continue + + # connect to the Modbus data source + master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0) + master.set_timeout(5.0) + print("Ready to connect to %s:%s ", host, port) + + # inner loop to read all point values within a configurable period + while True: + is_modbus_tcp_timed_out = False + energy_value_list = list() + analog_value_list = list() + digital_value_list = list() + + # foreach point loop + for point in point_list: + try: + address = json.loads(point['address']) + except Exception as e: + logger.error("Error in step 3.2 of acquisition process: \n" + "Invalid point address in JSON " + str(e)) + continue + + if 'slave_id' not in address.keys() \ + or 'function_code' not in address.keys() \ + or 'offset' not in address.keys() \ + or 'number_of_registers' not in address.keys() \ + or 'format' not in address.keys() \ + or 'byte_swap' not in address.keys() \ + or address['slave_id'] < 1 \ + or address['function_code'] not in (1, 2, 3, 4) \ + or address['offset'] < 0 \ + or address['number_of_registers'] < 0 \ + or len(address['format']) < 1 \ + or not isinstance(address['byte_swap'], bool): + + logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.', + data_source_id, point['id']) + # invalid point is found, and go on the foreach point loop to process next point + continue + + # read register value for valid point + try: + result = master.execute(slave=address['slave_id'], + function_code=address['function_code'], + starting_address=address['offset'], + quantity_of_x=address['number_of_registers'], + data_format=address['format']) + except Exception as e: + logger.error(str(e) + + " host:" + host + " port:" + str(port) + + " slave_id:" + str(address['slave_id']) + + " function_code:" + str(address['function_code']) + + " starting_address:" + str(address['offset']) + + " quantity_of_x:" + str(address['number_of_registers']) + + " data_format:" + str(address['format']) + + " byte_swap:" + str(address['byte_swap'])) + + if 'timed out' in str(e): + is_modbus_tcp_timed_out = True + # timeout error, break the foreach point loop + break + else: + # exception occurred when read register value, go on the foreach point loop + continue + + if result is None or not isinstance(result, tuple) or len(result) == 0: + # invalid result, and go on the foreach point loop to process next point + logger.error("Error in step 3.3 of acquisition process: \n" + " invalid result: None " + " for point_id: " + str(point['id'])) + continue + + if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]): + # invalid result, and go on the foreach point loop to process next point + logger.error(" Error in step 3.4 of acquisition process:\n" + " invalid result: not float and not int or not a number " + " for point_id: " + str(point['id'])) + continue + + if address['byte_swap']: + if address['number_of_registers'] == 2: + value = byte_swap_32_bit(result[0]) + elif address['number_of_registers'] == 4: + value = byte_swap_64_bit(result[0]) + else: + value = result[0] + else: + value = result[0] + + if point['object_type'] == 'ANALOG_VALUE': + analog_value_list.append({'data_source_id': data_source_id, + 'point_id': point['id'], + 'is_trend': point['is_trend'], + 'value': Decimal(value) * point['ratio']}) + elif point['object_type'] == 'ENERGY_VALUE': + energy_value_list.append({'data_source_id': data_source_id, + 'point_id': point['id'], + 'is_trend': point['is_trend'], + 'value': Decimal(value) * point['ratio']}) + elif point['object_type'] == 'DIGITAL_VALUE': + digital_value_list.append({'data_source_id': data_source_id, + 'point_id': point['id'], + 'is_trend': point['is_trend'], + 'value': int(value) * int(point['ratio'])}) + + # end of foreach point loop + + if is_modbus_tcp_timed_out: + # Modbus TCP connection timeout error + + # destroy the Modbus master + del master + + # close the connection to database + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + + # break the inner while loop to reconnect the Modbus device + time.sleep(60) + break + + ############################################################################################################ + # Step 4: Bulk insert point values and update latest values in historical database + ############################################################################################################ + # check the connection to the Historical Database + if not cnx_historical_db.is_connected(): + try: + cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) + cursor_historical_db = cnx_historical_db.cursor() + except Exception as e: + logger.error("Error in step 4.1 of acquisition process: " + str(e)) + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + # sleep some seconds + time.sleep(60) + continue + + current_datetime_utc = datetime.utcnow() + # bulk insert values into historical database within a period + # update latest values in the meanwhile + if len(analog_value_list) > 0: + add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) " + " VALUES ") + trend_value_count = 0 + + for point_value in analog_value_list: + if point_value['is_trend']: + add_values += " (" + str(point_value['point_id']) + "," + add_values += "'" + current_datetime_utc.isoformat() + "'," + add_values += str(point_value['value']) + "), " + trend_value_count += 1 + + if trend_value_count > 0: + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(add_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.2.1 of acquisition process " + str(e)) + # ignore this exception + pass + + # update tbl_analog_value_latest + delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( " + latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) " + " VALUES ") + latest_value_count = 0 + + for point_value in analog_value_list: + delete_values += str(point_value['point_id']) + "," + latest_values += " (" + str(point_value['point_id']) + "," + latest_values += "'" + current_datetime_utc.isoformat() + "'," + latest_values += str(point_value['value']) + "), " + latest_value_count += 1 + + if latest_value_count > 0: + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(delete_values[:-1] + ")") + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.2.2 of acquisition process " + str(e)) + # ignore this exception + pass + + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(latest_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.2.3 of acquisition process " + str(e)) + # ignore this exception + pass + + if len(energy_value_list) > 0: + add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) " + " VALUES ") + trend_value_count = 0 + + for point_value in energy_value_list: + if point_value['is_trend']: + add_values += " (" + str(point_value['point_id']) + "," + add_values += "'" + current_datetime_utc.isoformat() + "'," + add_values += str(point_value['value']) + "), " + trend_value_count += 1 + + if trend_value_count > 0: + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(add_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.3.1 of acquisition process: " + str(e)) + # ignore this exception + pass + + # update tbl_energy_value_latest + delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( " + latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) " + " VALUES ") + + latest_value_count = 0 + for point_value in energy_value_list: + delete_values += str(point_value['point_id']) + "," + latest_values += " (" + str(point_value['point_id']) + "," + latest_values += "'" + current_datetime_utc.isoformat() + "'," + latest_values += str(point_value['value']) + "), " + latest_value_count += 1 + + if latest_value_count > 0: + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(delete_values[:-1] + ")") + cnx_historical_db.commit() + + except Exception as e: + logger.error("Error in step 4.3.2 of acquisition process " + str(e)) + # ignore this exception + pass + + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(latest_values[:-2]) + cnx_historical_db.commit() + + except Exception as e: + logger.error("Error in step 4.3.3 of acquisition process " + str(e)) + # ignore this exception + pass + + if len(digital_value_list) > 0: + add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) " + " VALUES ") + trend_value_count = 0 + + for point_value in digital_value_list: + if point_value['is_trend']: + add_values += " (" + str(point_value['point_id']) + "," + add_values += "'" + current_datetime_utc.isoformat() + "'," + add_values += str(point_value['value']) + "), " + trend_value_count += 1 + + if trend_value_count > 0: + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(add_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.4.1 of acquisition process: " + str(e)) + # ignore this exception + pass + + # update tbl_digital_value_latest + delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( " + latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) " + " VALUES ") + latest_value_count = 0 + for point_value in digital_value_list: + delete_values += str(point_value['point_id']) + "," + latest_values += " (" + str(point_value['point_id']) + "," + latest_values += "'" + current_datetime_utc.isoformat() + "'," + latest_values += str(point_value['value']) + "), " + latest_value_count += 1 + + if latest_value_count > 0: + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(delete_values[:-1] + ")") + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.4.2 of acquisition process " + str(e)) + # ignore this exception + pass + + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(latest_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.4.3 of acquisition process " + str(e)) + # ignore this exception + pass + + # sleep some seconds + time.sleep(config.interval_in_seconds) + + # end of inner while loop + + # end of outermost while loop diff --git a/myems-modbus-tcp/byte_swap.py b/myems-modbus-tcp/byte_swap.py new file mode 100644 index 00000000..06a05c5f --- /dev/null +++ b/myems-modbus-tcp/byte_swap.py @@ -0,0 +1,45 @@ +import struct +######################################################################################################################## +# Swap adjacent bytes +# This is not big-endian and little-endian swapping. +######################################################################################################################## + + +# swap adjacent bytes of 32bits (4bytes) data, +# abcd => badc +def byte_swap_32_bit(x): + x_type = type(x) + if x_type is float: + x = struct.unpack('>I', struct.pack('>f', x))[0] + + a = ((x >> 8) & 0x00FF0000) + b = ((x << 8) & 0xFF000000) + c = ((x >> 8) & 0x000000FF) + d = ((x << 8) & 0x0000FF00) + + if x_type is float: + return struct.unpack('>f', struct.pack('>I', b | a | d | c))[0] + else: + return b | a | d | c + + +# swap adjacent bytes of 64bits (8bytes) data, +# abcdefgh => badcfehg +def byte_swap_64_bit(x): + x_type = type(x) + if x_type is float: + x = struct.unpack('>Q', struct.pack('>d', x))[0] + + a = ((x >> 8) & 0x00FF000000000000) + b = ((x << 8) & 0xFF00000000000000) + c = ((x >> 8) & 0x000000FF00000000) + d = ((x << 8) & 0x0000FF0000000000) + e = ((x >> 8) & 0x0000000000FF0000) + f = ((x << 8) & 0x00000000FF000000) + g = ((x >> 8) & 0x00000000000000FF) + h = ((x << 8) & 0x000000000000FF00) + + if x_type is float: + return struct.unpack('>d', struct.pack('>Q', b | a | d | c | f | e | h | g))[0] + else: + return b | a | d | c | f | e | h | g diff --git a/myems-modbus-tcp/config.py b/myems-modbus-tcp/config.py new file mode 100644 index 00000000..f7e26376 --- /dev/null +++ b/myems-modbus-tcp/config.py @@ -0,0 +1,25 @@ +myems_system_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_system_db', + 'port': 3306, +} + +myems_historical_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_historical_db', + 'port': 3306, +} + +# Indicates how long the process waits between readings +interval_in_seconds = 180 + +# Get the gateway ID and token from MyEMS Admin +# This is used for getting data sources associated with the gateway +gateway = { + 'id': 1, + 'token': 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA' +} diff --git a/myems-modbus-tcp/main.py b/myems-modbus-tcp/main.py new file mode 100644 index 00000000..0dd3c351 --- /dev/null +++ b/myems-modbus-tcp/main.py @@ -0,0 +1,104 @@ +import json +import mysql.connector +import config +from multiprocessing import Process +import time +import logging +from logging.handlers import RotatingFileHandler +import acquisition + + +def main(): + """main""" + # create logger + logger = logging.getLogger('myems-modbus-tcp') + # specifies the lowest-severity log message a logger will handle, + # where debug is the lowest built-in severity level and critical is the highest built-in severity. + # For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL + # messages and will ignore DEBUG messages. + logger.setLevel(logging.ERROR) + # create file handler which logs messages + fh = RotatingFileHandler('myems-modbus-tcp.log', maxBytes=1024*1024, backupCount=1) + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + # add the handlers to logger + logger.addHandler(fh) + + # Get Data Sources + while True: + # TODO: This service has to RESTART to reload latest data sources and this should be fixed + cnx_system_db = None + cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in main process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep several minutes and continue the outer loop to reload points + time.sleep(60) + continue + + # Get data sources by gateway and protocol + try: + query = (" SELECT ds.id, ds.name, ds.connection " + " FROM tbl_data_sources ds, tbl_gateways g " + " WHERE ds.protocol = 'modbus-tcp' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s " + " ORDER BY ds.id ") + cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],)) + rows_data_source = cursor_system_db.fetchall() + except Exception as e: + logger.error("Error in main process " + str(e)) + # sleep several minutes and continue the outer loop to reload points + time.sleep(60) + continue + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + if rows_data_source is None or len(rows_data_source) == 0: + logger.error("Data Source Not Found, Wait for minutes to retry.") + # wait for a while and retry + time.sleep(60) + continue + else: + # Stop to connect these data sources + break + + for row_data_source in rows_data_source: + print("Data Source: ID=%s, Name=%s, Connection=%s " % + (row_data_source[0], row_data_source[1], row_data_source[2])) + + if row_data_source[2] is None or len(row_data_source[2]) == 0: + logger.error("Data Source Connection Not Found.") + continue + + try: + server = json.loads(row_data_source[2]) + except Exception as e: + logger.error("Data Source Connection JSON error " + str(e)) + continue + + if 'host' not in server.keys() \ + or 'port' not in server.keys() \ + or server['host'] is None \ + or server['port'] is None \ + or len(server['host']) == 0 \ + or not isinstance(server['port'], int) \ + or server['port'] < 1: + logger.error("Data Source Connection Invalid.") + continue + + # fork worker process for each data source + # todo: how to restart the process if the process terminated unexpectedly + Process(target=acquisition.process, args=(logger, row_data_source[0], server['host'], server['port'])).start() + + +if __name__ == "__main__": + main() diff --git a/myems-modbus-tcp/myems-modbus-tcp.service b/myems-modbus-tcp/myems-modbus-tcp.service new file mode 100644 index 00000000..44426fd5 --- /dev/null +++ b/myems-modbus-tcp/myems-modbus-tcp.service @@ -0,0 +1,15 @@ +[Unit] +Description=myems-modbus-tcp daemon +After=network.target + +[Service] +User=root +Group=root +ExecStart=/usr/bin/python3 /myems-modbus-tcp/main.py +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID +PrivateTmp=true +Restart=always + +[Install] +WantedBy=multi-user.target diff --git a/myems-modbus-tcp/test.py b/myems-modbus-tcp/test.py new file mode 100644 index 00000000..fb72fab9 --- /dev/null +++ b/myems-modbus-tcp/test.py @@ -0,0 +1,97 @@ +import sys +from modbus_tk import modbus_tcp +import telnetlib +import byte_swap + + +def main(): + if len(sys.argv) > 1: + host = sys.argv[1] + else: + print('Usage: python3 test.py HOST_IP_ADDR ') + return + + port = 502 + try: + telnetlib.Telnet(host, port, 10) + print("Succeeded to telnet %s:%s ", host, port) + except Exception as e: + print("Failed to telnet %s:%s : %s ", host, port, str(e)) + return + + """ + Functions to convert between Python values and C structs. + Python bytes objects are used to hold the data representing the C struct + and also as format strings (explained below) to describe the layout of data + in the C struct. + + The optional first format char indicates byte order, size and alignment: + @: native order, size & alignment (default) + =: native order, std. size & alignment + <: little-endian, std. size & alignment + >: big-endian, std. size & alignment + !: same as > + + The remaining chars indicate types of args and must match exactly; + these can be preceded by a decimal repeat count: + x: pad byte (no data); c:char; b:signed byte; B:unsigned byte; + ?: _Bool (requires C99; if not available, char is used instead) + h:short; H:unsigned short; i:int; I:unsigned int; + l:long; L:unsigned long; f:float; d:double. + Special cases (preceding decimal count indicates length): + s:string (array of char); p: pascal string (with count byte). + Special cases (only available in native format): + n:ssize_t; N:size_t; + P:an integer type that is wide enough to hold a pointer. + Special case (not in native mode unless 'long long' in platform C): + q:long long; Q:unsigned long long + Whitespace between formats is ignored. + + The variable struct.error is an exception raised on errors. + """ + try: + master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0) + master.set_timeout(5.0) + print("Connected to %s:%s ", host, port) + print("read registers...") + result = master.execute(slave=1, function_code=3, starting_address=6401, quantity_of_x=2, data_format='