From 3adb58297246f7f69da05928efd67400b38dec57 Mon Sep 17 00:00:00 2001 From: "13621160019@163.com" <13621160019@163.com> Date: Fri, 19 Feb 2021 11:21:43 +0800 Subject: [PATCH] merged myems-bacnet --- myems-bacnet/LICENSE | 21 ++ myems-bacnet/README.md | 90 ++++++ myems-bacnet/acquisition.py | 509 ++++++++++++++++++++++++++++++ myems-bacnet/config.py | 37 +++ myems-bacnet/main.py | 30 ++ myems-bacnet/myems-bacnet.service | 15 + myems-bacnet/myems_application.py | 79 +++++ myems-bacnet/test.py | 49 +++ 8 files changed, 830 insertions(+) create mode 100644 myems-bacnet/LICENSE create mode 100644 myems-bacnet/README.md create mode 100644 myems-bacnet/acquisition.py create mode 100644 myems-bacnet/config.py create mode 100644 myems-bacnet/main.py create mode 100644 myems-bacnet/myems-bacnet.service create mode 100644 myems-bacnet/myems_application.py create mode 100644 myems-bacnet/test.py diff --git a/myems-bacnet/LICENSE b/myems-bacnet/LICENSE new file mode 100644 index 00000000..c6cedd28 --- /dev/null +++ b/myems-bacnet/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2020 MyEMS + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/myems-bacnet/README.md b/myems-bacnet/README.md new file mode 100644 index 00000000..d3fb82bb --- /dev/null +++ b/myems-bacnet/README.md @@ -0,0 +1,90 @@ +# MyEMS BACnet Service + + +## Introduction + +This service is a component of MyEMS to acquire data from BACnet devices + +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/42a86789a9a0425492a8890b5ae43dd8)](https://app.codacy.com/gh/myems/myems-bacnet?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-bacnet&utm_campaign=Badge_Grade) +[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-bacnet/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-bacnet/?branch=master) +[![Maintainability](https://api.codeclimate.com/v1/badges/90a12776a218fb5ff465/maintainability)](https://codeclimate.com/github/myems/myems-bacnet/maintainability) +[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-bacnet.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-bacnet/alerts/) + +## Prerequisites +bacpypes + +mysql.connector + + + +## Installation + +Download and install MySQL Connector: +``` + $ cd ~/tools + $ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz + $ tar xzf mysql-connector-python-8.0.20.tar.gz + $ cd ~/tools/mysql-connector-python-8.0.20 + $ sudo python3 setup.py install +``` + +Download and install bacpypes library +``` + $ cd ~/tools + $ git clone https://github.com/pypa/setuptools_scm.git + $ git clone https://github.com/pytest-dev/pytest-runner.git + $ git clone https://github.com/JoelBender/bacpypes.git + $ cd ~/tools/setuptools_scm/ + $ sudo python3 setup.py install + $ cd ~/tools/pytest-runner/ + $ sudo python3 setup.py install + $ cd ~/tools/bacpypes + $ sudo python3 setup.py install + $ sudo ufw allow 47808 +``` + +Install myems-bacnet service +``` + $ cd ~ + $ git clone https://github.com/myems.git + $ git checkout master (or the latest release tag) + $ sudo cp -R ~/myems/myems-bacnet /myems-bacnet +``` + Eidt the config config +``` + $ sudo nano /myems-bacnet/config.py +``` + Setup systemd service: +``` + $ sudo cp /myems-bacnet/myems-bacnet.service /lib/systemd/system/ + $ sudo systemctl enable myems-bacnet.service + $ sudo systemctl start myems-bacnet.service +``` + +### Add Data Sources and Points in MyEMS Admin + +Data source protocol: +``` +bacnet-ip +``` + +Data source connection example: +``` +{"host": "192.168.0.3", "port": 47808} +``` + +Point address example: +``` +{"object_id":3002786,"object_type":"analogValue","property_array_index":null,"property_name":"presentValue"} +``` + + +## References + +[1]. http://myems.io + +[2]. http://bacnet.org + +[3]. https://github.com/JoelBender/bacpypes + + diff --git a/myems-bacnet/acquisition.py b/myems-bacnet/acquisition.py new file mode 100644 index 00000000..fa3cb2f2 --- /dev/null +++ b/myems-bacnet/acquisition.py @@ -0,0 +1,509 @@ +import json +import mysql.connector +import config +import time +from datetime import datetime +import math +from decimal import Decimal +from bacpypes.core import run, stop, deferred +from bacpypes.local.device import LocalDeviceObject +from bacpypes.pdu import Address, GlobalBroadcast + +from myems_application import MyEMSApplication + + +######################################################################################################################## +# Acquisition Procedures +# Step 1: Get data source list +# Step 2: Get point list +# Step 3: Read point values from BACnet +# Step 4: Bulk insert point values and update latest values in historical database +######################################################################################################################## + +def process(logger, ): + print("Creating a device object...") + try: + # make a device object + this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], + objectIdentifier=config.bacnet_device['object_identifier'], + maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], + segmentationSupported=config.bacnet_device['segmentation_supported'], + vendorIdentifier=config.bacnet_device['vendor_identifier']) + + except Exception as e: + logger.error("Failed to create BACnet device object: " + str(e)) + # ignore + pass + + print("Connecting to myems_system_db ...") + cnx_system_db = None + cursor_system_db = None + + while not cnx_system_db or not cnx_system_db.is_connected(): + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error to connect to myems_system_db in acquisition process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + print("Failed to connect to myems_system_db, sleep a minute and retry...") + time.sleep(60) + continue + + print("Connecting to myems_historical_db...") + cnx_historical_db = None + cursor_historical_db = None + + while not cnx_historical_db or not cnx_historical_db.is_connected(): + try: + cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) + cursor_historical_db = cnx_historical_db.cursor() + except Exception as e: + logger.error("Error to connect to myems_historical_db in acquisition process " + str(e)) + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + print("Failed to connect to myems_historical_db, sleep a minute and retry...") + time.sleep(60) + continue + + while True: + # the outermost while loop + ################################################################################################################ + # Step 1: Get data source list + ################################################################################################################ + # check the connection to the database + if not cnx_system_db or not cnx_system_db.is_connected(): + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 1.1 of acquisition process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep and retry + time.sleep(60) + continue + + # Get data sources by gateway and protocol + try: + query = (" SELECT ds.id, ds.name, ds.connection " + " FROM tbl_data_sources ds, tbl_gateways g " + " WHERE ds.protocol = 'bacnet-ip' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s " + " ORDER BY ds.id ") + cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],)) + rows_data_source = cursor_system_db.fetchall() + except Exception as e: + logger.error("Error in step 1.2 of acquisition process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep and retry + time.sleep(60) + continue + + if rows_data_source is None or len(rows_data_source) == 0: + logger.error("Step 1.2: Data Source Not Found, Wait for minutes to retry.") + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep and retry + time.sleep(60) + continue + + point_dict = dict() + bacnet_point_list = list() + for row_data_source in rows_data_source: + # print("Data Source: ID=%s, Name=%s, Connection=%s ", + # row_data_source[0], row_data_source[1], row_data_source[2]) + + if row_data_source[2] is None or len(row_data_source[2]) == 0: + logger.error("Step 1.2: Data Source Connection Not Found. ID=%s, Name=%s, Connection=%s ", + row_data_source[0], row_data_source[1], row_data_source[2]) + # go to next row_data_source in for loop + continue + try: + server = json.loads(row_data_source[2]) + except Exception as e: + logger.error("Error in step 1.3 of acquisition process: \n" + "Invalid Data Source Connection in JSON " + str(e)) + # go to next row_data_source in for loop + continue + + if 'host' not in server.keys() or server['host'] is None or len(server['host']) == 0: + logger.error("Step 1.4: Data Source Connection Invalid. ID=%s, Name=%s, Connection=%s ", + row_data_source[0], row_data_source[1], row_data_source[2]) + # go to next row_data_source in for loop + continue + + data_source_id = row_data_source[0] + data_source_name = row_data_source[1] + data_source_host = server['host'] + + ############################################################################################################ + # Step 2: Get point list + ############################################################################################################ + try: + query = (" SELECT id, name, object_type, is_trend, ratio, address " + " FROM tbl_points " + " WHERE data_source_id = %s " + " ORDER BY id ") + cursor_system_db.execute(query, (data_source_id,)) + rows_points = cursor_system_db.fetchall() + except Exception as e: + logger.error("Error in step 2.1 of acquisition process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # go to next row_data_source in for loop + continue + + if rows_points is None or len(rows_points) == 0: + # there is no points for this data source + logger.error("Error in step 2.1 of acquisition process: \n" + "Point Not Found in Data Source (ID = %s, Name = %s) ", data_source_id, data_source_name) + # go to next row_data_source in for loop + continue + + # There are points for this data source + for row_point in rows_points: + # print("Point in DataSource(ID=%s): ID=%s, Name=%s, ObjectType=%s, IsTrend = %s, Address=%s ", + # data_source_id, row_point[0], row_point[1], row_point[2], row_point[3], row_point[4]) + point_id = row_point[0] + point_name = row_point[1] + point_object_type = row_point[2] + is_trend = row_point[3] + ratio = row_point[4] + address = json.loads(row_point[5]) + # address example + # {"object_type":"analogValue", "object_id":3004860, "property_name":"presentValue", + # "property_array_index":null} + if 'object_type' not in address.keys() \ + or address['object_type'] is None \ + or len(address['object_type']) == 0 \ + or 'object_id' not in address.keys() \ + or address['object_id'] is None \ + or address['object_id'] < 0 \ + or 'property_name' not in address.keys() \ + or address['property_name'] is None \ + or len(address['property_name']) == 0 \ + or 'property_array_index' not in address.keys(): + logger.error("Point Address Invalid. ID=%s, Name=%s, Address=%s ", + row_point[0], row_point[1], row_point[5]) + # go to next row_point in for loop + continue + + bacnet_object_type = address['object_type'] + bacnet_object_id = address['object_id'] + bacnet_property_name = address['property_name'] + bacnet_property_array_index = address['property_array_index'] + + point_dict[point_id] = {'data_source_id': data_source_id, + 'point_name': point_name, + 'object_type': point_object_type, + 'is_trend': is_trend, + 'ratio': ratio, + } + bacnet_point_list.append((point_id, + data_source_host, + bacnet_object_type, + bacnet_object_id, + bacnet_property_name, + bacnet_property_array_index)) + # end of for row_point + # end of for row_data_source + + ################################################################################################################ + # Step 3: Read point values from BACnet + ################################################################################################################ + if point_dict is None or len(point_dict) == 0: + logger.error("Point Not Found in Step 2") + continue + + if bacnet_point_list is None or len(bacnet_point_list) == 0: + logger.error("BACnet Point Not Found in Step 2") + continue + + if not isinstance(this_device, LocalDeviceObject): + try: + # Create a device object + this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], + objectIdentifier=config.bacnet_device['object_identifier'], + maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], + segmentationSupported=config.bacnet_device['segmentation_supported'], + vendorIdentifier=config.bacnet_device['vendor_identifier']) + except Exception as e: + logger.error("Step 3.1 Create BACnet device " + str(e)) + continue + + try: + # make a BIPForeignApplication + this_application = MyEMSApplication(bacnet_point_list, + this_device, + config.bacnet_device['local_address'], + Address(config.bacnet_device['foreignBBMD']), + int(config.bacnet_device['foreignTTL'])) + + # fire off a request when the core has a chance + deferred(this_application.next_request) + + run() + + energy_value_list = list() + analog_value_list = list() + digital_value_list = list() + + # dump out the results + for request, response in zip(bacnet_point_list, this_application.response_values): + # print("request=%s, response=%s ", request, response,) + point_id = request[0] + data_source_id = point_dict[point_id]['data_source_id'] + object_type = point_dict[point_id]['object_type'] + is_trend = point_dict[point_id]['is_trend'] + ratio = point_dict[point_id]['ratio'] + if not isinstance(response, int) and \ + not isinstance(response, float) and \ + not isinstance(response, bool) and \ + not isinstance(response, str): + # go to next response + logger.error("response data type %s, value=%s invalid: request=%s", + type(response), response, request) + continue + else: + value = response + + if object_type == 'ANALOG_VALUE': + if math.isnan(value): + logger.error("response data type is Not A Number: request=%s", request) + continue + + analog_value_list.append({'data_source_id': data_source_id, + 'point_id': point_id, + 'is_trend': is_trend, + 'value': Decimal(value) * ratio}) + elif object_type == 'ENERGY_VALUE': + if math.isnan(value): + logger.error("response data type is Not A Number: request=%s", request) + continue + + energy_value_list.append({'data_source_id': data_source_id, + 'point_id': point_id, + 'is_trend': is_trend, + 'value': Decimal(value) * ratio}) + elif object_type == 'DIGITAL_VALUE': + if isinstance(value, str): + if value == 'active': + value = 1 + elif value == 'inactive': + value = 0 + + digital_value_list.append({'data_source_id': data_source_id, + 'point_id': point_id, + 'is_trend': is_trend, + 'value': int(value) * int(ratio)}) + + except Exception as e: + logger.error("Step 3.2 ReadPointList " + str(e)) + time.sleep(60) + continue + finally: + this_application.close_socket() + del this_application + + ################################################################################################################ + # Step 4: Bulk insert point values and update latest values in historical database + ################################################################################################################ + # check the connection to the database + if not cnx_historical_db or not cnx_historical_db.is_connected(): + try: + cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) + cursor_historical_db = cnx_historical_db.cursor() + except Exception as e: + logger.error("Error in step 4.1 of acquisition process " + str(e)) + + if cnx_historical_db: + cnx_historical_db.close() + if cursor_historical_db: + cursor_historical_db.close() + # sleep and continue outer while loop to reconnect to server + time.sleep(60) + continue + + current_datetime_utc = datetime.utcnow() + # bulk insert values into historical database within a period + # update latest values in the meanwhile + if len(analog_value_list) > 0: + add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) " + " VALUES ") + trend_value_count = 0 + + for point_value in analog_value_list: + if point_value['is_trend']: + add_values += " (" + str(point_value['point_id']) + "," + add_values += "'" + current_datetime_utc.isoformat() + "'," + add_values += str(point_value['value']) + "), " + trend_value_count += 1 + + if trend_value_count > 0: + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(add_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.2.1 of acquisition process " + str(e)) + # ignore this exception + pass + + # update tbl_analog_value_latest + delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( " + latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) " + " VALUES ") + latest_value_count = 0 + + for point_value in analog_value_list: + delete_values += str(point_value['point_id']) + "," + latest_values += " (" + str(point_value['point_id']) + "," + latest_values += "'" + current_datetime_utc.isoformat() + "'," + latest_values += str(point_value['value']) + "), " + latest_value_count += 1 + + if latest_value_count > 0: + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(delete_values[:-1] + ")") + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.2.2 of acquisition process " + str(e)) + # ignore this exception + pass + + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(latest_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.2.3 of acquisition process " + str(e)) + # ignore this exception + pass + + if len(energy_value_list) > 0: + add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) " + " VALUES ") + trend_value_count = 0 + + for point_value in energy_value_list: + if point_value['is_trend']: + add_values += " (" + str(point_value['point_id']) + "," + add_values += "'" + current_datetime_utc.isoformat() + "'," + add_values += str(point_value['value']) + "), " + trend_value_count += 1 + + if trend_value_count > 0: + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(add_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.3.1 of acquisition process: " + str(e)) + # ignore this exception + pass + + # update tbl_energy_value_latest + delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( " + latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) " + " VALUES ") + + latest_value_count = 0 + for point_value in energy_value_list: + delete_values += str(point_value['point_id']) + "," + latest_values += " (" + str(point_value['point_id']) + "," + latest_values += "'" + current_datetime_utc.isoformat() + "'," + latest_values += str(point_value['value']) + "), " + latest_value_count += 1 + + if latest_value_count > 0: + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(delete_values[:-1] + ")") + cnx_historical_db.commit() + + except Exception as e: + logger.error("Error in step 4.3.2 of acquisition process " + str(e)) + # ignore this exception + pass + + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(latest_values[:-2]) + cnx_historical_db.commit() + + except Exception as e: + logger.error("Error in step 4.3.3 of acquisition process " + str(e)) + # ignore this exception + pass + + if len(digital_value_list) > 0: + add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) " + " VALUES ") + trend_value_count = 0 + + for point_value in digital_value_list: + if point_value['is_trend']: + add_values += " (" + str(point_value['point_id']) + "," + add_values += "'" + current_datetime_utc.isoformat() + "'," + add_values += str(point_value['value']) + "), " + trend_value_count += 1 + + if trend_value_count > 0: + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(add_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.4.1 of acquisition process: " + str(e)) + # ignore this exception + pass + + # update tbl_digital_value_latest + delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( " + latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) " + " VALUES ") + latest_value_count = 0 + for point_value in digital_value_list: + delete_values += str(point_value['point_id']) + "," + latest_values += " (" + str(point_value['point_id']) + "," + latest_values += "'" + current_datetime_utc.isoformat() + "'," + latest_values += str(point_value['value']) + "), " + latest_value_count += 1 + + if latest_value_count > 0: + try: + # replace "," at the end of string with ")" + cursor_historical_db.execute(delete_values[:-1] + ")") + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.4.2 of acquisition process " + str(e)) + # ignore this exception + pass + + try: + # trim ", " at the end of string and then execute + cursor_historical_db.execute(latest_values[:-2]) + cnx_historical_db.commit() + except Exception as e: + logger.error("Error in step 4.4.3 of acquisition process " + str(e)) + # ignore this exception + pass + + # sleep some seconds + time.sleep(config.interval_in_seconds) + # end of the outermost while loop diff --git a/myems-bacnet/config.py b/myems-bacnet/config.py new file mode 100644 index 00000000..89357c9e --- /dev/null +++ b/myems-bacnet/config.py @@ -0,0 +1,37 @@ +myems_system_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_system_db', + 'port': 3306, +} + +myems_historical_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_historical_db', + 'port': 3306, +} + +# Indicates how long the process waits between readings +interval_in_seconds = 180 + +bacnet_device = { + 'local_address': '192.168.1.10', + 'object_name': 'MYEMS', + 'object_identifier': 0xABCD, + 'max_apdu_length_accepted': 1024, + 'segmentation_supported': 'segmentedBoth', + 'vendor_identifier': 0xABCD, + 'foreignPort': 47808, + 'foreignBBMD': '192.168.0.1', + 'foreignTTL': 30, +} + +# Get the gateway ID and token from MyEMS Admin +# This is used for getting data sources associated with the gateway +gateway = { + 'id': 1, + 'token': 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA' +} diff --git a/myems-bacnet/main.py b/myems-bacnet/main.py new file mode 100644 index 00000000..15dc75cf --- /dev/null +++ b/myems-bacnet/main.py @@ -0,0 +1,30 @@ +import logging +from logging.handlers import RotatingFileHandler +import acquisition + + +def main(): + """main""" + # create logger + logger = logging.getLogger('myems-bacnet') + # specifies the lowest-severity log message a logger will handle, + # where debug is the lowest built-in severity level and critical is the highest built-in severity. + # For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL + # messages and will ignore DEBUG messages. + logger.setLevel(logging.ERROR) + # create file handler which logs messages + fh = RotatingFileHandler('myems-bacnet.log', maxBytes=1024*1024, backupCount=1) + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + # add the handlers to logger + logger.addHandler(fh) + + #################################################################################################################### + # Create Acquisition Process + #################################################################################################################### + acquisition.process(logger,) + + +if __name__ == "__main__": + main() diff --git a/myems-bacnet/myems-bacnet.service b/myems-bacnet/myems-bacnet.service new file mode 100644 index 00000000..837921ea --- /dev/null +++ b/myems-bacnet/myems-bacnet.service @@ -0,0 +1,15 @@ +[Unit] +Description=myems-bacnet daemon +After=network.target + +[Service] +User=root +Group=root +ExecStart=/usr/bin/python3 /myems-bacnet/main.py +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID +PrivateTmp=true +Restart=always + +[Install] +WantedBy=multi-user.target diff --git a/myems-bacnet/myems_application.py b/myems-bacnet/myems_application.py new file mode 100644 index 00000000..4e0dd17c --- /dev/null +++ b/myems-bacnet/myems_application.py @@ -0,0 +1,79 @@ +from collections import deque + +from bacpypes.app import BIPForeignApplication +from bacpypes.core import stop, deferred +from bacpypes.iocb import IOCB + +from bacpypes.pdu import Address +from bacpypes.object import get_datatype + +from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK +from bacpypes.primitivedata import Unsigned +from bacpypes.constructeddata import Array + + +class MyEMSApplication(BIPForeignApplication): + + def __init__(self, point_list, *args): + + BIPForeignApplication.__init__(self, *args) + + # turn the point list into a queue + self.point_queue = deque(point_list) + + # make a list of the response values + self.response_values = [] + + def next_request(self): + + # check to see if we're done + if not self.point_queue: + stop() + return + + # get the next request + point_id, addr, obj_type, obj_inst, prop_id, idx = self.point_queue.popleft() + + # build a request + request = ReadPropertyRequest( + objectIdentifier=(obj_type, obj_inst), + propertyIdentifier=prop_id, + propertyArrayIndex=idx + ) + request.pduDestination = Address(addr) + + # make an IOCB + iocb = IOCB(request) + + # set a callback for the response + iocb.add_callback(self.complete_request) + + # send the request + self.request_io(iocb) + + def complete_request(self, iocb): + if iocb.ioResponse: + apdu = iocb.ioResponse + + # find the datatype + datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) + if not datatype: + raise TypeError("unknown datatype") + + # special case for array parts, others are managed by cast_out + if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): + if apdu.propertyArrayIndex == 0: + value = apdu.propertyValue.cast_out(Unsigned) + else: + value = apdu.propertyValue.cast_out(datatype.subtype) + else: + value = apdu.propertyValue.cast_out(datatype) + + # save the value + self.response_values.append(value) + + if iocb.ioError: + self.response_values.append(iocb.ioError) + + # fire off another request + deferred(self.next_request) diff --git a/myems-bacnet/test.py b/myems-bacnet/test.py new file mode 100644 index 00000000..161b52e3 --- /dev/null +++ b/myems-bacnet/test.py @@ -0,0 +1,49 @@ +from bacpypes.core import run, stop, deferred +from bacpypes.local.device import LocalDeviceObject +from bacpypes.pdu import Address, GlobalBroadcast +from myems_application import MyEMSApplication +import config + + +######################################################################################################################## +# this procedure tests BACnet/IP environment +######################################################################################################################## +def main(): + + # make a device object + this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], + objectIdentifier=config.bacnet_device['object_identifier'], + maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], + segmentationSupported=config.bacnet_device['segmentation_supported'], + vendorIdentifier=config.bacnet_device['vendor_identifier'], ) + + # point list, set according to your device + point_list = [ + # point_id, addr, obj_type, obj_inst, prop_id, idx + (1, '10.117.73.53', 'analogInput', 1, 'presentValue', None), + (2, '10.117.73.53', 'analogInput', 2, 'presentValue', None), + (3, '10.117.73.53', 'analogInput', 3, 'presentValue', None), + (4, '10.117.73.53', 'analogInput', 4, 'presentValue', None), + (5, '10.117.73.53', 'analogInput', 5, 'presentValue', None), + (6, '10.117.73.53', 'analogInput', 6, 'presentValue', None), + ] + + # make a simple application + this_application = MyEMSApplication(point_list, + this_device, + config.bacnet_device['local_address'], + Address(config.bacnet_device['foreignBBMD']), + int(config.bacnet_device['foreignTTL'])) + + # fire off a request when the core has a chance + deferred(this_application.next_request) + + run() + + # dump out the results + for request, response in zip(point_list, this_application.response_values): + print(request, response) + + +if __name__ == "__main__": + main()