merged myems-bacnet
parent
fb90e99eb7
commit
3adb582972
|
@ -0,0 +1,21 @@
|
||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2020 MyEMS
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
|
@ -0,0 +1,90 @@
|
||||||
|
# MyEMS BACnet Service
|
||||||
|
|
||||||
|
|
||||||
|
## Introduction
|
||||||
|
|
||||||
|
This service is a component of MyEMS to acquire data from BACnet devices
|
||||||
|
|
||||||
|
[](https://app.codacy.com/gh/myems/myems-bacnet?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-bacnet&utm_campaign=Badge_Grade)
|
||||||
|
[](https://scrutinizer-ci.com/g/myems/myems-bacnet/?branch=master)
|
||||||
|
[](https://codeclimate.com/github/myems/myems-bacnet/maintainability)
|
||||||
|
[](https://lgtm.com/projects/g/myems/myems-bacnet/alerts/)
|
||||||
|
|
||||||
|
## Prerequisites
|
||||||
|
bacpypes
|
||||||
|
|
||||||
|
mysql.connector
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
## Installation
|
||||||
|
|
||||||
|
Download and install MySQL Connector:
|
||||||
|
```
|
||||||
|
$ cd ~/tools
|
||||||
|
$ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz
|
||||||
|
$ tar xzf mysql-connector-python-8.0.20.tar.gz
|
||||||
|
$ cd ~/tools/mysql-connector-python-8.0.20
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
```
|
||||||
|
|
||||||
|
Download and install bacpypes library
|
||||||
|
```
|
||||||
|
$ cd ~/tools
|
||||||
|
$ git clone https://github.com/pypa/setuptools_scm.git
|
||||||
|
$ git clone https://github.com/pytest-dev/pytest-runner.git
|
||||||
|
$ git clone https://github.com/JoelBender/bacpypes.git
|
||||||
|
$ cd ~/tools/setuptools_scm/
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
$ cd ~/tools/pytest-runner/
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
$ cd ~/tools/bacpypes
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
$ sudo ufw allow 47808
|
||||||
|
```
|
||||||
|
|
||||||
|
Install myems-bacnet service
|
||||||
|
```
|
||||||
|
$ cd ~
|
||||||
|
$ git clone https://github.com/myems.git
|
||||||
|
$ git checkout master (or the latest release tag)
|
||||||
|
$ sudo cp -R ~/myems/myems-bacnet /myems-bacnet
|
||||||
|
```
|
||||||
|
Eidt the config config
|
||||||
|
```
|
||||||
|
$ sudo nano /myems-bacnet/config.py
|
||||||
|
```
|
||||||
|
Setup systemd service:
|
||||||
|
```
|
||||||
|
$ sudo cp /myems-bacnet/myems-bacnet.service /lib/systemd/system/
|
||||||
|
$ sudo systemctl enable myems-bacnet.service
|
||||||
|
$ sudo systemctl start myems-bacnet.service
|
||||||
|
```
|
||||||
|
|
||||||
|
### Add Data Sources and Points in MyEMS Admin
|
||||||
|
|
||||||
|
Data source protocol:
|
||||||
|
```
|
||||||
|
bacnet-ip
|
||||||
|
```
|
||||||
|
|
||||||
|
Data source connection example:
|
||||||
|
```
|
||||||
|
{"host": "192.168.0.3", "port": 47808}
|
||||||
|
```
|
||||||
|
|
||||||
|
Point address example:
|
||||||
|
```
|
||||||
|
{"object_id":3002786,"object_type":"analogValue","property_array_index":null,"property_name":"presentValue"}
|
||||||
|
```
|
||||||
|
|
||||||
|
|
||||||
|
## References
|
||||||
|
|
||||||
|
[1]. http://myems.io
|
||||||
|
|
||||||
|
[2]. http://bacnet.org
|
||||||
|
|
||||||
|
[3]. https://github.com/JoelBender/bacpypes
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,509 @@
|
||||||
|
import json
|
||||||
|
import mysql.connector
|
||||||
|
import config
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
import math
|
||||||
|
from decimal import Decimal
|
||||||
|
from bacpypes.core import run, stop, deferred
|
||||||
|
from bacpypes.local.device import LocalDeviceObject
|
||||||
|
from bacpypes.pdu import Address, GlobalBroadcast
|
||||||
|
|
||||||
|
from myems_application import MyEMSApplication
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################################################
|
||||||
|
# Acquisition Procedures
|
||||||
|
# Step 1: Get data source list
|
||||||
|
# Step 2: Get point list
|
||||||
|
# Step 3: Read point values from BACnet
|
||||||
|
# Step 4: Bulk insert point values and update latest values in historical database
|
||||||
|
########################################################################################################################
|
||||||
|
|
||||||
|
def process(logger, ):
|
||||||
|
print("Creating a device object...")
|
||||||
|
try:
|
||||||
|
# make a device object
|
||||||
|
this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'],
|
||||||
|
objectIdentifier=config.bacnet_device['object_identifier'],
|
||||||
|
maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'],
|
||||||
|
segmentationSupported=config.bacnet_device['segmentation_supported'],
|
||||||
|
vendorIdentifier=config.bacnet_device['vendor_identifier'])
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Failed to create BACnet device object: " + str(e))
|
||||||
|
# ignore
|
||||||
|
pass
|
||||||
|
|
||||||
|
print("Connecting to myems_system_db ...")
|
||||||
|
cnx_system_db = None
|
||||||
|
cursor_system_db = None
|
||||||
|
|
||||||
|
while not cnx_system_db or not cnx_system_db.is_connected():
|
||||||
|
try:
|
||||||
|
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||||
|
cursor_system_db = cnx_system_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error to connect to myems_system_db in acquisition process " + str(e))
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
print("Failed to connect to myems_system_db, sleep a minute and retry...")
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Connecting to myems_historical_db...")
|
||||||
|
cnx_historical_db = None
|
||||||
|
cursor_historical_db = None
|
||||||
|
|
||||||
|
while not cnx_historical_db or not cnx_historical_db.is_connected():
|
||||||
|
try:
|
||||||
|
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
|
||||||
|
cursor_historical_db = cnx_historical_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error to connect to myems_historical_db in acquisition process " + str(e))
|
||||||
|
if cursor_historical_db:
|
||||||
|
cursor_historical_db.close()
|
||||||
|
if cnx_historical_db:
|
||||||
|
cnx_historical_db.close()
|
||||||
|
print("Failed to connect to myems_historical_db, sleep a minute and retry...")
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# the outermost while loop
|
||||||
|
################################################################################################################
|
||||||
|
# Step 1: Get data source list
|
||||||
|
################################################################################################################
|
||||||
|
# check the connection to the database
|
||||||
|
if not cnx_system_db or not cnx_system_db.is_connected():
|
||||||
|
try:
|
||||||
|
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||||
|
cursor_system_db = cnx_system_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.1 of acquisition process " + str(e))
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
# sleep and retry
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get data sources by gateway and protocol
|
||||||
|
try:
|
||||||
|
query = (" SELECT ds.id, ds.name, ds.connection "
|
||||||
|
" FROM tbl_data_sources ds, tbl_gateways g "
|
||||||
|
" WHERE ds.protocol = 'bacnet-ip' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s "
|
||||||
|
" ORDER BY ds.id ")
|
||||||
|
cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],))
|
||||||
|
rows_data_source = cursor_system_db.fetchall()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.2 of acquisition process " + str(e))
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
# sleep and retry
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if rows_data_source is None or len(rows_data_source) == 0:
|
||||||
|
logger.error("Step 1.2: Data Source Not Found, Wait for minutes to retry.")
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
# sleep and retry
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
point_dict = dict()
|
||||||
|
bacnet_point_list = list()
|
||||||
|
for row_data_source in rows_data_source:
|
||||||
|
# print("Data Source: ID=%s, Name=%s, Connection=%s ",
|
||||||
|
# row_data_source[0], row_data_source[1], row_data_source[2])
|
||||||
|
|
||||||
|
if row_data_source[2] is None or len(row_data_source[2]) == 0:
|
||||||
|
logger.error("Step 1.2: Data Source Connection Not Found. ID=%s, Name=%s, Connection=%s ",
|
||||||
|
row_data_source[0], row_data_source[1], row_data_source[2])
|
||||||
|
# go to next row_data_source in for loop
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
server = json.loads(row_data_source[2])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.3 of acquisition process: \n"
|
||||||
|
"Invalid Data Source Connection in JSON " + str(e))
|
||||||
|
# go to next row_data_source in for loop
|
||||||
|
continue
|
||||||
|
|
||||||
|
if 'host' not in server.keys() or server['host'] is None or len(server['host']) == 0:
|
||||||
|
logger.error("Step 1.4: Data Source Connection Invalid. ID=%s, Name=%s, Connection=%s ",
|
||||||
|
row_data_source[0], row_data_source[1], row_data_source[2])
|
||||||
|
# go to next row_data_source in for loop
|
||||||
|
continue
|
||||||
|
|
||||||
|
data_source_id = row_data_source[0]
|
||||||
|
data_source_name = row_data_source[1]
|
||||||
|
data_source_host = server['host']
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# Step 2: Get point list
|
||||||
|
############################################################################################################
|
||||||
|
try:
|
||||||
|
query = (" SELECT id, name, object_type, is_trend, ratio, address "
|
||||||
|
" FROM tbl_points "
|
||||||
|
" WHERE data_source_id = %s "
|
||||||
|
" ORDER BY id ")
|
||||||
|
cursor_system_db.execute(query, (data_source_id,))
|
||||||
|
rows_points = cursor_system_db.fetchall()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 2.1 of acquisition process " + str(e))
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
# go to next row_data_source in for loop
|
||||||
|
continue
|
||||||
|
|
||||||
|
if rows_points is None or len(rows_points) == 0:
|
||||||
|
# there is no points for this data source
|
||||||
|
logger.error("Error in step 2.1 of acquisition process: \n"
|
||||||
|
"Point Not Found in Data Source (ID = %s, Name = %s) ", data_source_id, data_source_name)
|
||||||
|
# go to next row_data_source in for loop
|
||||||
|
continue
|
||||||
|
|
||||||
|
# There are points for this data source
|
||||||
|
for row_point in rows_points:
|
||||||
|
# print("Point in DataSource(ID=%s): ID=%s, Name=%s, ObjectType=%s, IsTrend = %s, Address=%s ",
|
||||||
|
# data_source_id, row_point[0], row_point[1], row_point[2], row_point[3], row_point[4])
|
||||||
|
point_id = row_point[0]
|
||||||
|
point_name = row_point[1]
|
||||||
|
point_object_type = row_point[2]
|
||||||
|
is_trend = row_point[3]
|
||||||
|
ratio = row_point[4]
|
||||||
|
address = json.loads(row_point[5])
|
||||||
|
# address example
|
||||||
|
# {"object_type":"analogValue", "object_id":3004860, "property_name":"presentValue",
|
||||||
|
# "property_array_index":null}
|
||||||
|
if 'object_type' not in address.keys() \
|
||||||
|
or address['object_type'] is None \
|
||||||
|
or len(address['object_type']) == 0 \
|
||||||
|
or 'object_id' not in address.keys() \
|
||||||
|
or address['object_id'] is None \
|
||||||
|
or address['object_id'] < 0 \
|
||||||
|
or 'property_name' not in address.keys() \
|
||||||
|
or address['property_name'] is None \
|
||||||
|
or len(address['property_name']) == 0 \
|
||||||
|
or 'property_array_index' not in address.keys():
|
||||||
|
logger.error("Point Address Invalid. ID=%s, Name=%s, Address=%s ",
|
||||||
|
row_point[0], row_point[1], row_point[5])
|
||||||
|
# go to next row_point in for loop
|
||||||
|
continue
|
||||||
|
|
||||||
|
bacnet_object_type = address['object_type']
|
||||||
|
bacnet_object_id = address['object_id']
|
||||||
|
bacnet_property_name = address['property_name']
|
||||||
|
bacnet_property_array_index = address['property_array_index']
|
||||||
|
|
||||||
|
point_dict[point_id] = {'data_source_id': data_source_id,
|
||||||
|
'point_name': point_name,
|
||||||
|
'object_type': point_object_type,
|
||||||
|
'is_trend': is_trend,
|
||||||
|
'ratio': ratio,
|
||||||
|
}
|
||||||
|
bacnet_point_list.append((point_id,
|
||||||
|
data_source_host,
|
||||||
|
bacnet_object_type,
|
||||||
|
bacnet_object_id,
|
||||||
|
bacnet_property_name,
|
||||||
|
bacnet_property_array_index))
|
||||||
|
# end of for row_point
|
||||||
|
# end of for row_data_source
|
||||||
|
|
||||||
|
################################################################################################################
|
||||||
|
# Step 3: Read point values from BACnet
|
||||||
|
################################################################################################################
|
||||||
|
if point_dict is None or len(point_dict) == 0:
|
||||||
|
logger.error("Point Not Found in Step 2")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if bacnet_point_list is None or len(bacnet_point_list) == 0:
|
||||||
|
logger.error("BACnet Point Not Found in Step 2")
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not isinstance(this_device, LocalDeviceObject):
|
||||||
|
try:
|
||||||
|
# Create a device object
|
||||||
|
this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'],
|
||||||
|
objectIdentifier=config.bacnet_device['object_identifier'],
|
||||||
|
maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'],
|
||||||
|
segmentationSupported=config.bacnet_device['segmentation_supported'],
|
||||||
|
vendorIdentifier=config.bacnet_device['vendor_identifier'])
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Step 3.1 Create BACnet device " + str(e))
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
# make a BIPForeignApplication
|
||||||
|
this_application = MyEMSApplication(bacnet_point_list,
|
||||||
|
this_device,
|
||||||
|
config.bacnet_device['local_address'],
|
||||||
|
Address(config.bacnet_device['foreignBBMD']),
|
||||||
|
int(config.bacnet_device['foreignTTL']))
|
||||||
|
|
||||||
|
# fire off a request when the core has a chance
|
||||||
|
deferred(this_application.next_request)
|
||||||
|
|
||||||
|
run()
|
||||||
|
|
||||||
|
energy_value_list = list()
|
||||||
|
analog_value_list = list()
|
||||||
|
digital_value_list = list()
|
||||||
|
|
||||||
|
# dump out the results
|
||||||
|
for request, response in zip(bacnet_point_list, this_application.response_values):
|
||||||
|
# print("request=%s, response=%s ", request, response,)
|
||||||
|
point_id = request[0]
|
||||||
|
data_source_id = point_dict[point_id]['data_source_id']
|
||||||
|
object_type = point_dict[point_id]['object_type']
|
||||||
|
is_trend = point_dict[point_id]['is_trend']
|
||||||
|
ratio = point_dict[point_id]['ratio']
|
||||||
|
if not isinstance(response, int) and \
|
||||||
|
not isinstance(response, float) and \
|
||||||
|
not isinstance(response, bool) and \
|
||||||
|
not isinstance(response, str):
|
||||||
|
# go to next response
|
||||||
|
logger.error("response data type %s, value=%s invalid: request=%s",
|
||||||
|
type(response), response, request)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
value = response
|
||||||
|
|
||||||
|
if object_type == 'ANALOG_VALUE':
|
||||||
|
if math.isnan(value):
|
||||||
|
logger.error("response data type is Not A Number: request=%s", request)
|
||||||
|
continue
|
||||||
|
|
||||||
|
analog_value_list.append({'data_source_id': data_source_id,
|
||||||
|
'point_id': point_id,
|
||||||
|
'is_trend': is_trend,
|
||||||
|
'value': Decimal(value) * ratio})
|
||||||
|
elif object_type == 'ENERGY_VALUE':
|
||||||
|
if math.isnan(value):
|
||||||
|
logger.error("response data type is Not A Number: request=%s", request)
|
||||||
|
continue
|
||||||
|
|
||||||
|
energy_value_list.append({'data_source_id': data_source_id,
|
||||||
|
'point_id': point_id,
|
||||||
|
'is_trend': is_trend,
|
||||||
|
'value': Decimal(value) * ratio})
|
||||||
|
elif object_type == 'DIGITAL_VALUE':
|
||||||
|
if isinstance(value, str):
|
||||||
|
if value == 'active':
|
||||||
|
value = 1
|
||||||
|
elif value == 'inactive':
|
||||||
|
value = 0
|
||||||
|
|
||||||
|
digital_value_list.append({'data_source_id': data_source_id,
|
||||||
|
'point_id': point_id,
|
||||||
|
'is_trend': is_trend,
|
||||||
|
'value': int(value) * int(ratio)})
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Step 3.2 ReadPointList " + str(e))
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
this_application.close_socket()
|
||||||
|
del this_application
|
||||||
|
|
||||||
|
################################################################################################################
|
||||||
|
# Step 4: Bulk insert point values and update latest values in historical database
|
||||||
|
################################################################################################################
|
||||||
|
# check the connection to the database
|
||||||
|
if not cnx_historical_db or not cnx_historical_db.is_connected():
|
||||||
|
try:
|
||||||
|
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
|
||||||
|
cursor_historical_db = cnx_historical_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.1 of acquisition process " + str(e))
|
||||||
|
|
||||||
|
if cnx_historical_db:
|
||||||
|
cnx_historical_db.close()
|
||||||
|
if cursor_historical_db:
|
||||||
|
cursor_historical_db.close()
|
||||||
|
# sleep and continue outer while loop to reconnect to server
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
current_datetime_utc = datetime.utcnow()
|
||||||
|
# bulk insert values into historical database within a period
|
||||||
|
# update latest values in the meanwhile
|
||||||
|
if len(analog_value_list) > 0:
|
||||||
|
add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
trend_value_count = 0
|
||||||
|
|
||||||
|
for point_value in analog_value_list:
|
||||||
|
if point_value['is_trend']:
|
||||||
|
add_values += " (" + str(point_value['point_id']) + ","
|
||||||
|
add_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||||
|
add_values += str(point_value['value']) + "), "
|
||||||
|
trend_value_count += 1
|
||||||
|
|
||||||
|
if trend_value_count > 0:
|
||||||
|
try:
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_historical_db.execute(add_values[:-2])
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.2.1 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
# update tbl_analog_value_latest
|
||||||
|
delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( "
|
||||||
|
latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
latest_value_count = 0
|
||||||
|
|
||||||
|
for point_value in analog_value_list:
|
||||||
|
delete_values += str(point_value['point_id']) + ","
|
||||||
|
latest_values += " (" + str(point_value['point_id']) + ","
|
||||||
|
latest_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||||
|
latest_values += str(point_value['value']) + "), "
|
||||||
|
latest_value_count += 1
|
||||||
|
|
||||||
|
if latest_value_count > 0:
|
||||||
|
try:
|
||||||
|
# replace "," at the end of string with ")"
|
||||||
|
cursor_historical_db.execute(delete_values[:-1] + ")")
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.2.2 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_historical_db.execute(latest_values[:-2])
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.2.3 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
if len(energy_value_list) > 0:
|
||||||
|
add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
trend_value_count = 0
|
||||||
|
|
||||||
|
for point_value in energy_value_list:
|
||||||
|
if point_value['is_trend']:
|
||||||
|
add_values += " (" + str(point_value['point_id']) + ","
|
||||||
|
add_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||||
|
add_values += str(point_value['value']) + "), "
|
||||||
|
trend_value_count += 1
|
||||||
|
|
||||||
|
if trend_value_count > 0:
|
||||||
|
try:
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_historical_db.execute(add_values[:-2])
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.3.1 of acquisition process: " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
# update tbl_energy_value_latest
|
||||||
|
delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( "
|
||||||
|
latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
|
||||||
|
latest_value_count = 0
|
||||||
|
for point_value in energy_value_list:
|
||||||
|
delete_values += str(point_value['point_id']) + ","
|
||||||
|
latest_values += " (" + str(point_value['point_id']) + ","
|
||||||
|
latest_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||||
|
latest_values += str(point_value['value']) + "), "
|
||||||
|
latest_value_count += 1
|
||||||
|
|
||||||
|
if latest_value_count > 0:
|
||||||
|
try:
|
||||||
|
# replace "," at the end of string with ")"
|
||||||
|
cursor_historical_db.execute(delete_values[:-1] + ")")
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.3.2 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_historical_db.execute(latest_values[:-2])
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.3.3 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
if len(digital_value_list) > 0:
|
||||||
|
add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
trend_value_count = 0
|
||||||
|
|
||||||
|
for point_value in digital_value_list:
|
||||||
|
if point_value['is_trend']:
|
||||||
|
add_values += " (" + str(point_value['point_id']) + ","
|
||||||
|
add_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||||
|
add_values += str(point_value['value']) + "), "
|
||||||
|
trend_value_count += 1
|
||||||
|
|
||||||
|
if trend_value_count > 0:
|
||||||
|
try:
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_historical_db.execute(add_values[:-2])
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.4.1 of acquisition process: " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
# update tbl_digital_value_latest
|
||||||
|
delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( "
|
||||||
|
latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
latest_value_count = 0
|
||||||
|
for point_value in digital_value_list:
|
||||||
|
delete_values += str(point_value['point_id']) + ","
|
||||||
|
latest_values += " (" + str(point_value['point_id']) + ","
|
||||||
|
latest_values += "'" + current_datetime_utc.isoformat() + "',"
|
||||||
|
latest_values += str(point_value['value']) + "), "
|
||||||
|
latest_value_count += 1
|
||||||
|
|
||||||
|
if latest_value_count > 0:
|
||||||
|
try:
|
||||||
|
# replace "," at the end of string with ")"
|
||||||
|
cursor_historical_db.execute(delete_values[:-1] + ")")
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.4.2 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_historical_db.execute(latest_values[:-2])
|
||||||
|
cnx_historical_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.4.3 of acquisition process " + str(e))
|
||||||
|
# ignore this exception
|
||||||
|
pass
|
||||||
|
|
||||||
|
# sleep some seconds
|
||||||
|
time.sleep(config.interval_in_seconds)
|
||||||
|
# end of the outermost while loop
|
|
@ -0,0 +1,37 @@
|
||||||
|
myems_system_db = {
|
||||||
|
'user': 'root',
|
||||||
|
'password': '!MyEMS1',
|
||||||
|
'host': '127.0.0.1',
|
||||||
|
'database': 'myems_system_db',
|
||||||
|
'port': 3306,
|
||||||
|
}
|
||||||
|
|
||||||
|
myems_historical_db = {
|
||||||
|
'user': 'root',
|
||||||
|
'password': '!MyEMS1',
|
||||||
|
'host': '127.0.0.1',
|
||||||
|
'database': 'myems_historical_db',
|
||||||
|
'port': 3306,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Indicates how long the process waits between readings
|
||||||
|
interval_in_seconds = 180
|
||||||
|
|
||||||
|
bacnet_device = {
|
||||||
|
'local_address': '192.168.1.10',
|
||||||
|
'object_name': 'MYEMS',
|
||||||
|
'object_identifier': 0xABCD,
|
||||||
|
'max_apdu_length_accepted': 1024,
|
||||||
|
'segmentation_supported': 'segmentedBoth',
|
||||||
|
'vendor_identifier': 0xABCD,
|
||||||
|
'foreignPort': 47808,
|
||||||
|
'foreignBBMD': '192.168.0.1',
|
||||||
|
'foreignTTL': 30,
|
||||||
|
}
|
||||||
|
|
||||||
|
# Get the gateway ID and token from MyEMS Admin
|
||||||
|
# This is used for getting data sources associated with the gateway
|
||||||
|
gateway = {
|
||||||
|
'id': 1,
|
||||||
|
'token': 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA'
|
||||||
|
}
|
|
@ -0,0 +1,30 @@
|
||||||
|
import logging
|
||||||
|
from logging.handlers import RotatingFileHandler
|
||||||
|
import acquisition
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""main"""
|
||||||
|
# create logger
|
||||||
|
logger = logging.getLogger('myems-bacnet')
|
||||||
|
# specifies the lowest-severity log message a logger will handle,
|
||||||
|
# where debug is the lowest built-in severity level and critical is the highest built-in severity.
|
||||||
|
# For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL
|
||||||
|
# messages and will ignore DEBUG messages.
|
||||||
|
logger.setLevel(logging.ERROR)
|
||||||
|
# create file handler which logs messages
|
||||||
|
fh = RotatingFileHandler('myems-bacnet.log', maxBytes=1024*1024, backupCount=1)
|
||||||
|
# create formatter and add it to the handlers
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
fh.setFormatter(formatter)
|
||||||
|
# add the handlers to logger
|
||||||
|
logger.addHandler(fh)
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# Create Acquisition Process
|
||||||
|
####################################################################################################################
|
||||||
|
acquisition.process(logger,)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
|
@ -0,0 +1,15 @@
|
||||||
|
[Unit]
|
||||||
|
Description=myems-bacnet daemon
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
User=root
|
||||||
|
Group=root
|
||||||
|
ExecStart=/usr/bin/python3 /myems-bacnet/main.py
|
||||||
|
ExecReload=/bin/kill -s HUP $MAINPID
|
||||||
|
ExecStop=/bin/kill -s TERM $MAINPID
|
||||||
|
PrivateTmp=true
|
||||||
|
Restart=always
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
|
@ -0,0 +1,79 @@
|
||||||
|
from collections import deque
|
||||||
|
|
||||||
|
from bacpypes.app import BIPForeignApplication
|
||||||
|
from bacpypes.core import stop, deferred
|
||||||
|
from bacpypes.iocb import IOCB
|
||||||
|
|
||||||
|
from bacpypes.pdu import Address
|
||||||
|
from bacpypes.object import get_datatype
|
||||||
|
|
||||||
|
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
|
||||||
|
from bacpypes.primitivedata import Unsigned
|
||||||
|
from bacpypes.constructeddata import Array
|
||||||
|
|
||||||
|
|
||||||
|
class MyEMSApplication(BIPForeignApplication):
|
||||||
|
|
||||||
|
def __init__(self, point_list, *args):
|
||||||
|
|
||||||
|
BIPForeignApplication.__init__(self, *args)
|
||||||
|
|
||||||
|
# turn the point list into a queue
|
||||||
|
self.point_queue = deque(point_list)
|
||||||
|
|
||||||
|
# make a list of the response values
|
||||||
|
self.response_values = []
|
||||||
|
|
||||||
|
def next_request(self):
|
||||||
|
|
||||||
|
# check to see if we're done
|
||||||
|
if not self.point_queue:
|
||||||
|
stop()
|
||||||
|
return
|
||||||
|
|
||||||
|
# get the next request
|
||||||
|
point_id, addr, obj_type, obj_inst, prop_id, idx = self.point_queue.popleft()
|
||||||
|
|
||||||
|
# build a request
|
||||||
|
request = ReadPropertyRequest(
|
||||||
|
objectIdentifier=(obj_type, obj_inst),
|
||||||
|
propertyIdentifier=prop_id,
|
||||||
|
propertyArrayIndex=idx
|
||||||
|
)
|
||||||
|
request.pduDestination = Address(addr)
|
||||||
|
|
||||||
|
# make an IOCB
|
||||||
|
iocb = IOCB(request)
|
||||||
|
|
||||||
|
# set a callback for the response
|
||||||
|
iocb.add_callback(self.complete_request)
|
||||||
|
|
||||||
|
# send the request
|
||||||
|
self.request_io(iocb)
|
||||||
|
|
||||||
|
def complete_request(self, iocb):
|
||||||
|
if iocb.ioResponse:
|
||||||
|
apdu = iocb.ioResponse
|
||||||
|
|
||||||
|
# find the datatype
|
||||||
|
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
|
||||||
|
if not datatype:
|
||||||
|
raise TypeError("unknown datatype")
|
||||||
|
|
||||||
|
# special case for array parts, others are managed by cast_out
|
||||||
|
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
|
||||||
|
if apdu.propertyArrayIndex == 0:
|
||||||
|
value = apdu.propertyValue.cast_out(Unsigned)
|
||||||
|
else:
|
||||||
|
value = apdu.propertyValue.cast_out(datatype.subtype)
|
||||||
|
else:
|
||||||
|
value = apdu.propertyValue.cast_out(datatype)
|
||||||
|
|
||||||
|
# save the value
|
||||||
|
self.response_values.append(value)
|
||||||
|
|
||||||
|
if iocb.ioError:
|
||||||
|
self.response_values.append(iocb.ioError)
|
||||||
|
|
||||||
|
# fire off another request
|
||||||
|
deferred(self.next_request)
|
|
@ -0,0 +1,49 @@
|
||||||
|
from bacpypes.core import run, stop, deferred
|
||||||
|
from bacpypes.local.device import LocalDeviceObject
|
||||||
|
from bacpypes.pdu import Address, GlobalBroadcast
|
||||||
|
from myems_application import MyEMSApplication
|
||||||
|
import config
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################################################
|
||||||
|
# this procedure tests BACnet/IP environment
|
||||||
|
########################################################################################################################
|
||||||
|
def main():
|
||||||
|
|
||||||
|
# make a device object
|
||||||
|
this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'],
|
||||||
|
objectIdentifier=config.bacnet_device['object_identifier'],
|
||||||
|
maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'],
|
||||||
|
segmentationSupported=config.bacnet_device['segmentation_supported'],
|
||||||
|
vendorIdentifier=config.bacnet_device['vendor_identifier'], )
|
||||||
|
|
||||||
|
# point list, set according to your device
|
||||||
|
point_list = [
|
||||||
|
# point_id, addr, obj_type, obj_inst, prop_id, idx
|
||||||
|
(1, '10.117.73.53', 'analogInput', 1, 'presentValue', None),
|
||||||
|
(2, '10.117.73.53', 'analogInput', 2, 'presentValue', None),
|
||||||
|
(3, '10.117.73.53', 'analogInput', 3, 'presentValue', None),
|
||||||
|
(4, '10.117.73.53', 'analogInput', 4, 'presentValue', None),
|
||||||
|
(5, '10.117.73.53', 'analogInput', 5, 'presentValue', None),
|
||||||
|
(6, '10.117.73.53', 'analogInput', 6, 'presentValue', None),
|
||||||
|
]
|
||||||
|
|
||||||
|
# make a simple application
|
||||||
|
this_application = MyEMSApplication(point_list,
|
||||||
|
this_device,
|
||||||
|
config.bacnet_device['local_address'],
|
||||||
|
Address(config.bacnet_device['foreignBBMD']),
|
||||||
|
int(config.bacnet_device['foreignTTL']))
|
||||||
|
|
||||||
|
# fire off a request when the core has a chance
|
||||||
|
deferred(this_application.next_request)
|
||||||
|
|
||||||
|
run()
|
||||||
|
|
||||||
|
# dump out the results
|
||||||
|
for request, response in zip(point_list, this_application.response_values):
|
||||||
|
print(request, response)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
Loading…
Reference in New Issue