import json import mysql.connector import time import math from datetime import datetime import telnetlib from modbus_tk import modbus_tcp import config from decimal import Decimal from byte_swap import byte_swap_32_bit, byte_swap_64_bit ######################################################################################################################## # Acquisition Procedures # Step 1: telnet the host # Step 2: Get point list # Step 3: Read point values from Modbus slaves # Step 4: Bulk insert point values and update latest values in historical database ######################################################################################################################## def process(logger, data_source_id, host, port): while True: # the outermost while loop ################################################################################################################ # Step 1: telnet the host ################################################################################################################ try: telnetlib.Telnet(host, port, 10) print("Succeeded to telnet %s:%s in acquisition process ", host, port) except Exception as e: logger.error("Failed to telnet %s:%s in acquisition process: %s ", host, port, str(e)) time.sleep(300) continue ################################################################################################################ # Step 2: Get point list ################################################################################################################ cnx_system_db = None cursor_system_db = None try: cnx_system_db = mysql.connector.connect(**config.myems_system_db) cursor_system_db = cnx_system_db.cursor() except Exception as e: logger.error("Error in step 2.1 of acquisition process " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # sleep and then continue the outermost loop to reload points time.sleep(60) continue try: query = (" SELECT id, name, object_type, is_trend, ratio, address " " FROM tbl_points " " WHERE data_source_id = %s AND is_virtual = 0 " " ORDER BY id ") cursor_system_db.execute(query, (data_source_id, )) rows_point = cursor_system_db.fetchall() except Exception as e: logger.error("Error in step 2.2 of acquisition process: " + str(e)) # sleep several minutes and continue the outer loop to reload points time.sleep(60) continue if rows_point is None or len(rows_point) == 0: # there is no points for this data source logger.error("Point Not Found in Data Source (ID = %s), acquisition process terminated ", data_source_id) # sleep 60 seconds and go back to the begin of outermost while loop to reload points time.sleep(60) continue # There are points for this data source point_list = list() for row_point in rows_point: point_list.append({"id": row_point[0], "name": row_point[1], "object_type": row_point[2], "is_trend": row_point[3], "ratio": row_point[4], "address": row_point[5]}) ################################################################################################################ # Step 3: Read point values from Modbus slaves ################################################################################################################ # connect to historical database cnx_historical_db = None cursor_historical_db = None try: cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) cursor_historical_db = cnx_historical_db.cursor() except Exception as e: logger.error("Error in step 3.1 of acquisition process " + str(e)) if cursor_historical_db: cursor_historical_db.close() if cnx_historical_db: cnx_historical_db.close() # sleep 60 seconds and go back to the begin of outermost while loop to reload points time.sleep(60) continue # connect to the Modbus data source master = modbus_tcp.TcpMaster(host=host, port=port, timeout_in_sec=5.0) master.set_timeout(5.0) print("Ready to connect to %s:%s ", host, port) # inner loop to read all point values within a configurable period while True: is_modbus_tcp_timed_out = False energy_value_list = list() analog_value_list = list() digital_value_list = list() # foreach point loop for point in point_list: try: address = json.loads(point['address']) except Exception as e: logger.error("Error in step 3.2 of acquisition process: \n" "Invalid point address in JSON " + str(e)) continue if 'slave_id' not in address.keys() \ or 'function_code' not in address.keys() \ or 'offset' not in address.keys() \ or 'number_of_registers' not in address.keys() \ or 'format' not in address.keys() \ or 'byte_swap' not in address.keys() \ or address['slave_id'] < 1 \ or address['function_code'] not in (1, 2, 3, 4) \ or address['offset'] < 0 \ or address['number_of_registers'] < 0 \ or len(address['format']) < 1 \ or not isinstance(address['byte_swap'], bool): logger.error('Data Source(ID=%s), Point(ID=%s) Invalid address data.', data_source_id, point['id']) # invalid point is found, # and go on the foreach point loop to process next point continue # read register value for valid point try: result = master.execute(slave=address['slave_id'], function_code=address['function_code'], starting_address=address['offset'], quantity_of_x=address['number_of_registers'], data_format=address['format']) except Exception as e: logger.error(str(e) + " host:" + host + " port:" + str(port) + " slave_id:" + str(address['slave_id']) + " function_code:" + str(address['function_code']) + " starting_address:" + str(address['offset']) + " quantity_of_x:" + str(address['number_of_registers']) + " data_format:" + str(address['format']) + " byte_swap:" + str(address['byte_swap'])) if 'timed out' in str(e): is_modbus_tcp_timed_out = True # timeout error, break the foreach point loop break else: # exception occurred when read register value, # and go on the foreach point loop to process next point continue if result is None or not isinstance(result, tuple) or len(result) == 0: # invalid result, # and go on the foreach point loop to process next point logger.error("Error in step 3.3 of acquisition process: \n" " invalid result: None " " for point_id: " + str(point['id'])) continue if not isinstance(result[0], float) and not isinstance(result[0], int) or math.isnan(result[0]): # invalid result, and go on the foreach point loop to process next point logger.error(" Error in step 3.4 of acquisition process:\n" " invalid result: not float and not int or not a number " " for point_id: " + str(point['id'])) continue if address['byte_swap']: if address['number_of_registers'] == 2: value = byte_swap_32_bit(result[0]) elif address['number_of_registers'] == 4: value = byte_swap_64_bit(result[0]) else: value = result[0] else: value = result[0] if point['object_type'] == 'ANALOG_VALUE': analog_value_list.append({'data_source_id': data_source_id, 'point_id': point['id'], 'is_trend': point['is_trend'], 'value': Decimal(value) * point['ratio']}) elif point['object_type'] == 'ENERGY_VALUE': energy_value_list.append({'data_source_id': data_source_id, 'point_id': point['id'], 'is_trend': point['is_trend'], 'value': Decimal(value) * point['ratio']}) elif point['object_type'] == 'DIGITAL_VALUE': digital_value_list.append({'data_source_id': data_source_id, 'point_id': point['id'], 'is_trend': point['is_trend'], 'value': int(value) * int(point['ratio'])}) # end of foreach point loop if is_modbus_tcp_timed_out: # Modbus TCP connection timeout error # destroy the Modbus master del master # close the connection to database if cursor_historical_db: cursor_historical_db.close() if cnx_historical_db: cnx_historical_db.close() # break the inner while loop to reconnect the Modbus device time.sleep(60) break ############################################################################################################ # Step 4: Bulk insert point values and update latest values in historical database ############################################################################################################ # check the connection to the Historical Database if not cnx_historical_db.is_connected(): try: cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) cursor_historical_db = cnx_historical_db.cursor() except Exception as e: logger.error("Error in step 4.1 of acquisition process: " + str(e)) if cursor_historical_db: cursor_historical_db.close() if cnx_historical_db: cnx_historical_db.close() # sleep some seconds time.sleep(60) continue # check the connection to the System Database if not cnx_system_db.is_connected(): try: cnx_system_db = mysql.connector.connect(**config.myems_system_db) cursor_system_db = cnx_system_db.cursor() except Exception as e: logger.error("Error in step 4.2 of acquisition process: " + str(e)) if cursor_system_db: cursor_system_db.close() if cnx_system_db: cnx_system_db.close() # sleep some seconds time.sleep(60) continue current_datetime_utc = datetime.utcnow() # bulk insert values into historical database within a period # update latest values in the meanwhile if len(analog_value_list) > 0: add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) " " VALUES ") trend_value_count = 0 for point_value in analog_value_list: if point_value['is_trend']: add_values += " (" + str(point_value['point_id']) + "," add_values += "'" + current_datetime_utc.isoformat() + "'," add_values += str(point_value['value']) + "), " trend_value_count += 1 if trend_value_count > 0: try: # trim ", " at the end of string and then execute cursor_historical_db.execute(add_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.3.1 of acquisition process " + str(e)) # ignore this exception # update tbl_analog_value_latest delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( " latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) " " VALUES ") latest_value_count = 0 for point_value in analog_value_list: delete_values += str(point_value['point_id']) + "," latest_values += " (" + str(point_value['point_id']) + "," latest_values += "'" + current_datetime_utc.isoformat() + "'," latest_values += str(point_value['value']) + "), " latest_value_count += 1 if latest_value_count > 0: try: # replace "," at the end of string with ")" cursor_historical_db.execute(delete_values[:-1] + ")") cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.3.2 of acquisition process " + str(e)) # ignore this exception try: # trim ", " at the end of string and then execute cursor_historical_db.execute(latest_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.3.3 of acquisition process " + str(e)) # ignore this exception if len(energy_value_list) > 0: add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) " " VALUES ") trend_value_count = 0 for point_value in energy_value_list: if point_value['is_trend']: add_values += " (" + str(point_value['point_id']) + "," add_values += "'" + current_datetime_utc.isoformat() + "'," add_values += str(point_value['value']) + "), " trend_value_count += 1 if trend_value_count > 0: try: # trim ", " at the end of string and then execute cursor_historical_db.execute(add_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.4.1 of acquisition process: " + str(e)) # ignore this exception # update tbl_energy_value_latest delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( " latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) " " VALUES ") latest_value_count = 0 for point_value in energy_value_list: delete_values += str(point_value['point_id']) + "," latest_values += " (" + str(point_value['point_id']) + "," latest_values += "'" + current_datetime_utc.isoformat() + "'," latest_values += str(point_value['value']) + "), " latest_value_count += 1 if latest_value_count > 0: try: # replace "," at the end of string with ")" cursor_historical_db.execute(delete_values[:-1] + ")") cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.4.2 of acquisition process " + str(e)) # ignore this exception try: # trim ", " at the end of string and then execute cursor_historical_db.execute(latest_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.4.3 of acquisition process " + str(e)) # ignore this exception if len(digital_value_list) > 0: add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) " " VALUES ") trend_value_count = 0 for point_value in digital_value_list: if point_value['is_trend']: add_values += " (" + str(point_value['point_id']) + "," add_values += "'" + current_datetime_utc.isoformat() + "'," add_values += str(point_value['value']) + "), " trend_value_count += 1 if trend_value_count > 0: try: # trim ", " at the end of string and then execute cursor_historical_db.execute(add_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.5.1 of acquisition process: " + str(e)) # ignore this exception # update tbl_digital_value_latest delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( " latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) " " VALUES ") latest_value_count = 0 for point_value in digital_value_list: delete_values += str(point_value['point_id']) + "," latest_values += " (" + str(point_value['point_id']) + "," latest_values += "'" + current_datetime_utc.isoformat() + "'," latest_values += str(point_value['value']) + "), " latest_value_count += 1 if latest_value_count > 0: try: # replace "," at the end of string with ")" cursor_historical_db.execute(delete_values[:-1] + ")") cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.5.2 of acquisition process " + str(e)) # ignore this exception try: # trim ", " at the end of string and then execute cursor_historical_db.execute(latest_values[:-2]) cnx_historical_db.commit() except Exception as e: logger.error("Error in step 4.5.3 of acquisition process " + str(e)) # ignore this exception # update data source last seen datetime update_row = (" UPDATE tbl_data_sources " " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " " WHERE id = %s ") try: cursor_system_db.execute(update_row, (data_source_id, )) cnx_system_db.commit() except Exception as e: logger.error("Error in step 4.6 of acquisition process " + str(e)) # ignore this exception # sleep and continue the next iteration of the inner while loop time.sleep(config.interval_in_seconds) # end of inner while loop # end of outermost while loop