import json import mysql.connector import config import time from datetime import datetime import math from decimal import Decimal from bacpypes.core import run, stop, deferred from bacpypes.local.device import LocalDeviceObject from bacpypes.pdu import Address, GlobalBroadcast from myems_application import MyEMSApplication ######################################################################################################################## # Acquisition Procedures # Step 1: Get data source list # Step 2: Get point list # Step 3: Read point values from BACnet # Step 4: Bulk insert point values and update latest values in historical database ######################################################################################################################## def process(logger, ): print("Creating a device object...") try: # make a device object this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], objectIdentifier=config.bacnet_device['object_identifier'], maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], segmentationSupported=config.bacnet_device['segmentation_supported'], vendorIdentifier=config.bacnet_device['vendor_identifier']) except Exception as e: logger.error("Failed to create BACnet device object: " + str(e)) # ignore pass print("Connecting to myems_system_db ...") cnx_system_db = None cursor_system_db = None while not cnx_system_db or not cnx_system_db.is_connected(): try: cnx_system_db = mysql.connector.connect(**config.myems_system_db) cursor_system_db = cnx_system_db.cursor() except Exception as e: logger.error("Error to connect to myems_system_db in acquisition process " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() print("Failed to connect to myems_system_db, sleep a minute and retry...") time.sleep(60) continue print("Connecting to myems_historical_db...") cnx_historical_db = None cursor_historical_db = None while not cnx_historical_db or not cnx_historical_db.is_connected(): try: cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) cursor_historical_db = cnx_historical_db.cursor() except Exception as e: logger.error("Error to connect to myems_historical_db in acquisition process " + str(e)) if cursor_historical_db: cursor_historical_db.close() if cnx_historical_db: cnx_historical_db.close() print("Failed to connect to myems_historical_db, sleep a minute and retry...") time.sleep(60) continue while True: # the outermost while loop ################################################################################################################ # Step 1: Get data source list ################################################################################################################ # check the connection to the database if not cnx_system_db or not cnx_system_db.is_connected(): try: cnx_system_db = mysql.connector.connect(**config.myems_system_db) cursor_system_db = cnx_system_db.cursor() except Exception as e: logger.error("Error in step 1.1 of acquisition process " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # sleep and retry time.sleep(60) continue # Get data sources by gateway and protocol try: query = (" SELECT ds.id, ds.name, ds.connection " " FROM tbl_data_sources ds, tbl_gateways g " " WHERE ds.protocol = 'bacnet-ip' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s " " ORDER BY ds.id ") cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],)) rows_data_source = cursor_system_db.fetchall() except Exception as e: logger.error("Error in step 1.2 of acquisition process " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # sleep and retry time.sleep(60) continue if rows_data_source is None or len(rows_data_source) == 0: logger.error("Step 1.2: Data Source Not Found, Wait for minutes to retry.") if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # sleep and retry time.sleep(60) continue point_dict = dict() bacnet_point_list = list() for row_data_source in rows_data_source: # print("Data Source: ID=%s, Name=%s, Connection=%s ", # row_data_source[0], row_data_source[1], row_data_source[2]) if row_data_source[2] is None or len(row_data_source[2]) == 0: logger.error("Step 1.2: Data Source Connection Not Found. ID=%s, Name=%s, Connection=%s ", row_data_source[0], row_data_source[1], row_data_source[2]) # go to next row_data_source in for loop continue try: server = json.loads(row_data_source[2]) except Exception as e: logger.error("Error in step 1.3 of acquisition process: \n" "Invalid Data Source Connection in JSON " + str(e)) # go to next row_data_source in for loop continue if 'host' not in server.keys() or server['host'] is None or len(server['host']) == 0: logger.error("Step 1.4: Data Source Connection Invalid. ID=%s, Name=%s, Connection=%s ", row_data_source[0], row_data_source[1], row_data_source[2]) # go to next row_data_source in for loop continue data_source_id = row_data_source[0] data_source_name = row_data_source[1] data_source_host = server['host'] ############################################################################################################ # Step 2: Get point list ############################################################################################################ try: query = (" SELECT id, name, object_type, is_trend, ratio, address " " FROM tbl_points " " WHERE data_source_id = %s AND is_virtual = FALSE " " ORDER BY id ") cursor_system_db.execute(query, (data_source_id,)) rows_points = cursor_system_db.fetchall() except Exception as e: logger.error("Error in step 2.1 of acquisition process " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # go to next row_data_source in for loop continue if rows_points is None or len(rows_points) == 0: # there is no points for this data source logger.error("Error in step 2.1 of acquisition process: \n" "Point Not Found in Data Source (ID = %s, Name = %s) ", data_source_id, data_source_name) # go to next row_data_source in for loop continue # There are points for this data source for row_point in rows_points: # print("Point in DataSource(ID=%s): ID=%s, Name=%s, ObjectType=%s, IsTrend = %s, Address=%s ", # data_source_id, row_point[0], row_point[1], row_point[2], row_point[3], row_point[4]) point_id = row_point[0] point_name = row_point[1] point_object_type = row_point[2] is_trend = row_point[3] ratio = row_point[4] address = json.loads(row_point[5]) # address example # {"object_type":"analogValue", "object_id":3004860, "property_name":"presentValue", # "property_array_index":null} if 'object_type' not in address.keys() \ or address['object_type'] is None \ or len(address['object_type']) == 0 \ or 'object_id' not in address.keys() \ or address['object_id'] is None \ or address['object_id'] < 0 \ or 'property_name' not in address.keys() \ or address['property_name'] is None \ or len(address['property_name']) == 0 \ or 'property_array_index' not in address.keys(): logger.error("Point Address Invalid. ID=%s, Name=%s, Address=%s ", row_point[0], row_point[1], row_point[5]) # go to next row_point in for loop continue bacnet_object_type = address['object_type'] bacnet_object_id = address['object_id'] bacnet_property_name = address['property_name'] bacnet_property_array_index = address['property_array_index'] point_dict[point_id] = {'data_source_id': data_source_id, 'point_name': point_name, 'object_type': point_object_type, 'is_trend': is_trend, 'ratio': ratio, } bacnet_point_list.append((point_id, data_source_host, bacnet_object_type, bacnet_object_id, bacnet_property_name, bacnet_property_array_index)) # end of for row_point # end of for row_data_source ################################################################################################################ # Step 3: Read point values from BACnet ################################################################################################################ if point_dict is None or len(point_dict) == 0: logger.error("Point Not Found in Step 2") continue if bacnet_point_list is None or len(bacnet_point_list) == 0: logger.error("BACnet Point Not Found in Step 2") continue if not isinstance(this_device, LocalDeviceObject): try: # Create a device object this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], objectIdentifier=config.bacnet_device['object_identifier'], maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], segmentationSupported=config.bacnet_device['segmentation_supported'], vendorIdentifier=config.bacnet_device['vendor_identifier']) except Exception as e: logger.error("Step 3.1 Create BACnet device " + str(e)) continue try: # make a BIPForeignApplication this_application = MyEMSApplication(bacnet_point_list, this_device, config.bacnet_device['local_address'], Address(config.bacnet_device['foreignBBMD']), int(config.bacnet_device['foreignTTL'])) # fire off a request when the core has a chance deferred(this_application.next_request) run() energy_value_list = list() analog_value_list = list() digital_value_list = list() last_seen_data_source_set = set() # dump out the results for request, response in zip(bacnet_point_list, this_application.response_values): # print("request=%s, response=%s ", request, response,) point_id = request[0] data_source_id = point_dict[point_id]['data_source_id'] object_type = point_dict[point_id]['object_type'] is_trend = point_dict[point_id]['is_trend'] ratio = point_dict[point_id]['ratio'] if not isinstance(response, int) and \ not isinstance(response, float) and \ not isinstance(response, bool) and \ not isinstance(response, str): # go to next response logger.error("response data type %s, value=%s invalid: request=%s", type(response), response, request) continue else: value = response if object_type == 'ANALOG_VALUE': if math.isnan(value): logger.error("response data type is Not A Number: request=%s", request) continue analog_value_list.append({'data_source_id': data_source_id, 'point_id': point_id, 'is_trend': is_trend, 'value': Decimal(value) * ratio}) elif object_type == 'ENERGY_VALUE': if math.isnan(value): logger.error("response data type is Not A Number: request=%s", request) continue energy_value_list.append({'data_source_id': data_source_id, 'point_id': point_id, 'is_trend': is_trend, 'value': Decimal(value) * ratio}) elif object_type == 'DIGITAL_VALUE': if isinstance(value, str): if value == 'active': value = 1 elif value == 'inactive': value = 0 digital_value_list.append({'data_source_id': data_source_id, 'point_id': point_id, 'is_trend': is_trend, 'value': int(value) * int(ratio)}) # add data_source_id to the last seen set last_seen_data_source_set.add(data_source_id) except Exception as e: logger.error("Step 3.2 ReadPointList " + str(e)) time.sleep(60) continue finally: this_application.close_socket() del this_application ################################################################################################################ # Step 4: Bulk insert point values and update latest values in historical database ################################################################################################################ # check the connection to the database if not cnx_historical_db or not cnx_historical_db.is_connected(): try: cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) cursor_historical_db = cnx_historical_db.cursor() except Exception as e: logger.error("Error in step 4.1 of acquisition process " + str(e)) if cnx_historical_db: cnx_historical_db.close() if cursor_historical_db: cursor_historical_db.close() # sleep and continue outer while loop to reconnect to server time.sleep(60) continue current_datetime_utc = datetime.utcnow() # bulk insert values into historical database within a period # update latest values in the meanwhile if len(analog_value_list) > 0: add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) " " VALUES ") trend_value_count = 0 for point_value in analog_value_list: if point_value['is_trend']: add_values += " (" + str(point_value['point_id']) + "," add_values += "'" + current_datetime_utc.isoformat() + "'," add_values += str(point_value['value']) + "), " trend_value_count += 1 if trend_value_count > 0: try: # trim ", " at the end of string and then execute cursor_historical_db.execute(add_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.2.1 of acquisition process " + str(e)) # ignore this exception pass # update tbl_analog_value_latest delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( " latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) " " VALUES ") latest_value_count = 0 for point_value in analog_value_list: delete_values += str(point_value['point_id']) + "," latest_values += " (" + str(point_value['point_id']) + "," latest_values += "'" + current_datetime_utc.isoformat() + "'," latest_values += str(point_value['value']) + "), " latest_value_count += 1 if latest_value_count > 0: try: # replace "," at the end of string with ")" cursor_historical_db.execute(delete_values[:-1] + ")") cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.2.2 of acquisition process " + str(e)) # ignore this exception pass try: # trim ", " at the end of string and then execute cursor_historical_db.execute(latest_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.2.3 of acquisition process " + str(e)) # ignore this exception pass if len(energy_value_list) > 0: add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) " " VALUES ") trend_value_count = 0 for point_value in energy_value_list: if point_value['is_trend']: add_values += " (" + str(point_value['point_id']) + "," add_values += "'" + current_datetime_utc.isoformat() + "'," add_values += str(point_value['value']) + "), " trend_value_count += 1 if trend_value_count > 0: try: # trim ", " at the end of string and then execute cursor_historical_db.execute(add_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.3.1 of acquisition process: " + str(e)) # ignore this exception pass # update tbl_energy_value_latest delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( " latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) " " VALUES ") latest_value_count = 0 for point_value in energy_value_list: delete_values += str(point_value['point_id']) + "," latest_values += " (" + str(point_value['point_id']) + "," latest_values += "'" + current_datetime_utc.isoformat() + "'," latest_values += str(point_value['value']) + "), " latest_value_count += 1 if latest_value_count > 0: try: # replace "," at the end of string with ")" cursor_historical_db.execute(delete_values[:-1] + ")") cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.3.2 of acquisition process " + str(e)) # ignore this exception pass try: # trim ", " at the end of string and then execute cursor_historical_db.execute(latest_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.3.3 of acquisition process " + str(e)) # ignore this exception pass if len(digital_value_list) > 0: add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) " " VALUES ") trend_value_count = 0 for point_value in digital_value_list: if point_value['is_trend']: add_values += " (" + str(point_value['point_id']) + "," add_values += "'" + current_datetime_utc.isoformat() + "'," add_values += str(point_value['value']) + "), " trend_value_count += 1 if trend_value_count > 0: try: # trim ", " at the end of string and then execute cursor_historical_db.execute(add_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.4.1 of acquisition process: " + str(e)) # ignore this exception pass # update tbl_digital_value_latest delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( " latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) " " VALUES ") latest_value_count = 0 for point_value in digital_value_list: delete_values += str(point_value['point_id']) + "," latest_values += " (" + str(point_value['point_id']) + "," latest_values += "'" + current_datetime_utc.isoformat() + "'," latest_values += str(point_value['value']) + "), " latest_value_count += 1 if latest_value_count > 0: try: # replace "," at the end of string with ")" cursor_historical_db.execute(delete_values[:-1] + ")") cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.4.2 of acquisition process " + str(e)) # ignore this exception pass try: # trim ", " at the end of string and then execute cursor_historical_db.execute(latest_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.4.3 of acquisition process " + str(e)) # ignore this exception pass if len(last_seen_data_source_set) > 0: update_row = (" UPDATE tbl_data_sources " " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " " WHERE id IN (") for data_source_id in last_seen_data_source_set: update_row += str(data_source_id) + "," try: cursor_system_db.execute(update_row[:-1] + ")",) cnx_system_db.commit() except Exception as e: logger.error("Error in step 4.4.4 of acquisition process " + str(e)) # ignore this exception pass # sleep some seconds time.sleep(config.interval_in_seconds) # end of the outermost while loop