From 70467a37550d19cbf7c68b65e2d472eb6903b6fe Mon Sep 17 00:00:00 2001 From: "13621160019@163.com" <13621160019@163.com> Date: Fri, 19 Feb 2021 12:31:01 +0800 Subject: [PATCH] merged myems-normalization --- myems-bacnet/README.md | 3 +- myems-cleaning/README.md | 6 +- myems-modbus-tcp/README.md | 2 +- myems-normalization/LICENSE | 21 + myems-normalization/README.md | 107 ++++ myems-normalization/config.py | 43 ++ myems-normalization/main.py | 35 ++ myems-normalization/meter.py | 430 ++++++++++++++++ .../myems-normalization.service | 15 + myems-normalization/offline_meter_data.xlsx | Bin 0 -> 10766 bytes myems-normalization/offlinemeter.py | 302 +++++++++++ myems-normalization/virtualmeter.py | 475 ++++++++++++++++++ 12 files changed, 1434 insertions(+), 5 deletions(-) create mode 100644 myems-normalization/LICENSE create mode 100644 myems-normalization/README.md create mode 100644 myems-normalization/config.py create mode 100644 myems-normalization/main.py create mode 100644 myems-normalization/meter.py create mode 100644 myems-normalization/myems-normalization.service create mode 100644 myems-normalization/offline_meter_data.xlsx create mode 100644 myems-normalization/offlinemeter.py create mode 100644 myems-normalization/virtualmeter.py diff --git a/myems-bacnet/README.md b/myems-bacnet/README.md index d3fb82bb..e6b10628 100644 --- a/myems-bacnet/README.md +++ b/myems-bacnet/README.md @@ -46,7 +46,8 @@ Download and install bacpypes library Install myems-bacnet service ``` $ cd ~ - $ git clone https://github.com/myems.git + $ git clone https://github.com/myems/myems.git + $ cd myems $ git checkout master (or the latest release tag) $ sudo cp -R ~/myems/myems-bacnet /myems-bacnet ``` diff --git a/myems-cleaning/README.md b/myems-cleaning/README.md index 729d0a6b..d78bc543 100644 --- a/myems-cleaning/README.md +++ b/myems-cleaning/README.md @@ -33,14 +33,14 @@ Download and install MySQL Connector: Install myems-cleaning service ```bash $ cd ~ - $ git clone https://github.com/myems.git + $ git clone https://github.com/myems/myems.git + $ cd myems $ sudo git checkout master (or the latest release tag) $ sudo cp -R ~/myems/myems-cleaning /myems-cleaning - $ cd /myems-cleaning ``` Open config file and edit database configuration ```bash - $ sudo nano config.py + $ sudo nano /myems-cleaning/config.py ``` Setup systemd service: ```bash diff --git a/myems-modbus-tcp/README.md b/myems-modbus-tcp/README.md index 74991bc3..99ddde44 100644 --- a/myems-modbus-tcp/README.md +++ b/myems-modbus-tcp/README.md @@ -43,9 +43,9 @@ Install myems-modbus-tcp service ``` $ cd ~ $ git clone https://github.com/myems/myems.git + $ cd myems $ sudo git checkout master (or the latest release tag) $ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp - $ cd /myems-modbus-tcp ``` Eidt the config ``` diff --git a/myems-normalization/LICENSE b/myems-normalization/LICENSE new file mode 100644 index 00000000..b91c1ac4 --- /dev/null +++ b/myems-normalization/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2021 MyEMS + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/myems-normalization/README.md b/myems-normalization/README.md new file mode 100644 index 00000000..e43f1c74 --- /dev/null +++ b/myems-normalization/README.md @@ -0,0 +1,107 @@ +## MyEMS Normalization Service 数据规范化服务 + + + +### Introduction + +This service is a component of MyEMS and it normalizes energy data in historical database. + +[![Codacy Badge](https://api.codacy.com/project/badge/Grade/9c4a70cf88ab410d91294bf32ef6f371)](https://app.codacy.com/gh/myems/myems-normalization?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-normalization&utm_campaign=Badge_Grade) +[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-normalization/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-normalization/?branch=master) +[![Maintainability](https://api.codeclimate.com/v1/badges/4c9f5e65a35a1c968471/maintainability)](https://codeclimate.com/github/myems/myems-normalization/maintainability) +[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-normalization.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-normalization/alerts/) + + +### Prerequisites + +mysql.connector + +sympy + +openpyxl + + +### Installation + +Download and install MySQL Connector: +``` + $ cd ~/tools + $ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz + $ tar xzf mysql-connector-python-8.0.20.tar.gz + $ cd ~/tools/mysql-connector-python-8.0.20 + $ sudo python3 setup.py install +``` + +Download and install mpmath +``` + $ cd ~/tools + $ git clone https://github.com/fredrik-johansson/mpmath.git + $ cd ~/tools/mpmath + $ sudo python3 setup.py install +``` + +Download and install SymPy +``` + $ cd ~/tools + $ git clone https://github.com/sympy/sympy.git + $ cd ~/tools/sympy + $ sudo python3 setupegg.py develop +``` + +Download and install openpyxl +``` + $ cd ~/tools + + Get the latest version of et_xmlfile from https://bitbucket.org/openpyxl/et_xmlfile/downloads/ + $ wget https://bitbucket.org/openpyxl/et_xmlfile/get/50973a6de49c.zip + $ 7z x 50973a6de49c.zip && mv openpyxl-et_xmlfile-50973a6de49c et_xmlfile + + $ git clone https://github.com/phn/jdcal.git + + Get the latest version of openpyxl from https://bitbucket.org/openpyxl/openpyxl/downloads/ + $ wget https://bitbucket.org/openpyxl/openpyxl/get/8953233f5af2.zip + $ 7z x 8953233f5af2.zip && mv openpyxl-openpyxl-8953233f5af2 openpyxl + + $ cd ~/tools/et_xmlfile + $ sudo python3 setup.py install + $ cd ~/tools/jdcal + $ sudo python3 setup.py install + $ cd ~/tools/openpyxl + $ sudo python3 setup.py install +``` + +Install myems-normalization service: +``` + $ cd ~ + $ git clone https://github.com/myems/myems.git + $ cd myems + $ sudo git checkout master (or the latest release tag) + $ sudo cp -r ~/myems/myems-normalization /myems-normalization +``` + + Edit config.py for your project +``` + $ sudo nano /myems-normalization/config.py +``` + + Setup systemd service: +``` + $ sudo cp myems-normalization.service /lib/systemd/system/ +``` + + Enable the service: +``` + $ sudo systemctl enable feed-normalization.service +``` + + Start the service: +``` + $ sudo systemctl start feed-normalization.service +``` + +### References + +1. https://myems.io +2. https://dev.mysql.com/doc/connector-python/en/ +3. https://github.com/sympy/sympy +4. https://openpyxl.readthedocs.io diff --git a/myems-normalization/config.py b/myems-normalization/config.py new file mode 100644 index 00000000..ebfb65ce --- /dev/null +++ b/myems-normalization/config.py @@ -0,0 +1,43 @@ +myems_system_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_system_db', + 'port': 3306, +} + +myems_energy_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_energy_db', + 'port': 3306, +} + +myems_historical_db = { + 'user': 'root', + 'password': '!MyEMS1', + 'host': '127.0.0.1', + 'database': 'myems_historical_db', + 'port': 3306, +} + +# indicates in how many minutes to normalize energy consumption +# 30 for half hourly +# 60 for hourly +minutes_to_count = 60 + +# indicates within how many minutes to allow myems-cleaning service to clean the historical data +minutes_to_clean = 30 + +# indicates from when (in UTC timezone) to calculate if the energy data is empty or were cleared +# format string: '%Y-%m-%d %H:%M:%S' +start_datetime_utc = '2019-12-31 16:00:00' + +# the number of worker processes in parallel for meter and virtual meter +# the pool size depends on the computing performance of the database server and the analysis server +pool_size = 5 + +# indicates the project's time zone offset from UTC +utc_offset = '+08:00' + diff --git a/myems-normalization/main.py b/myems-normalization/main.py new file mode 100644 index 00000000..f058e43f --- /dev/null +++ b/myems-normalization/main.py @@ -0,0 +1,35 @@ +import logging +from logging.handlers import RotatingFileHandler +from multiprocessing import Process +import meter +import offlinemeter +import virtualmeter + + +def main(): + """main""" + # create logger + logger = logging.getLogger('myems-normalization') + # specifies the lowest-severity log message a logger will handle, + # where debug is the lowest built-in severity level and critical is the highest built-in severity. + # For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL + # messages and will ignore DEBUG messages. + logger.setLevel(logging.ERROR) + # create file handler which logs messages + fh = RotatingFileHandler('myems-normalization.log', maxBytes=1024*1024, backupCount=1) + # create formatter and add it to the handlers + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + # add the handlers to logger + logger.addHandler(fh) + + # calculate energy consumption in hourly period + Process(target=meter.calculate_hourly, args=(logger,)).start() + Process(target=virtualmeter.calculate_hourly, args=(logger,)).start() + Process(target=offlinemeter.calculate_hourly, args=(logger,)).start() + + +if __name__ == '__main__': + main() + + diff --git a/myems-normalization/meter.py b/myems-normalization/meter.py new file mode 100644 index 00000000..f1d4578b --- /dev/null +++ b/myems-normalization/meter.py @@ -0,0 +1,430 @@ +import time +from datetime import datetime, timedelta, timezone +import mysql.connector +from multiprocessing import Pool +import random +from decimal import Decimal +import config + +######################################################################################################################## +# PROCEDURES: +# Step 1: Query all meters and associated energy value points +# Step 2: Create multiprocessing pool to call worker in parallel +######################################################################################################################## + + +def calculate_hourly(logger): + + while True: + ################################################################################################################ + # Step 1: Query all meters and associated energy value points + ################################################################################################################ + cnx_system_db = None + cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 1.1 of meter.calculate_hourly process " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep several minutes and continue the outer loop to reconnect the database + time.sleep(60) + continue + + print("Connected to the MyEMS System Database") + + try: + cursor_system_db.execute(" SELECT m.id, m.name, m.hourly_low_limit, m.hourly_high_limit, " + " p.id as point_id, p.units " + " FROM tbl_meters m, tbl_meters_points mp, tbl_points p " + " WHERE m.id = mp.meter_id " + " AND mp.point_id = p.id " + " AND p.object_type = 'ENERGY_VALUE'") + rows_meters = cursor_system_db.fetchall() + + if rows_meters is None or len(rows_meters) == 0: + # sleep several minutes and continue the outer loop to reconnect the database + time.sleep(60) + continue + + meter_list = list() + for row in rows_meters: + meta_result = {"id": row[0], + "name": row[1], + "hourly_low_limit": row[2], + "hourly_high_limit": row[3], + "point_id": row[4], + "units": row[5]} + + meter_list.append(meta_result) + + except Exception as e: + logger.error("Error in step 1.2 meter.calculate_hourly " + str(e)) + # sleep several minutes and continue the outer loop to reconnect the database + time.sleep(60) + continue + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + # shuffle the meter list for randomly calculating the meter hourly value + random.shuffle(meter_list) + + print("Got all meters in MyEMS System Database") + + ################################################################################################################ + # Step 2: Create multiprocessing pool to call worker in parallel + ################################################################################################################ + p = Pool(processes=config.pool_size) + error_list = p.map(worker, meter_list) + p.close() + p.join() + + for error in error_list: + if error is not None and len(error) > 0: + logger.error(error) + + print("go to sleep ...") + time.sleep(60) + print("wake from sleep, and continue to work...") + # end of outer while + + +######################################################################################################################## +# PROCEDURES: +# Step 1: Determine the start datetime and end datetime +# Step 2: Get raw data from historical database between start_datetime_utc and end datetime +# Step 3: Normalize energy values by minutes_to_count +# Step 4: Insert into energy database +# +# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter +######################################################################################################################## + +def worker(meter): + print("Start to process meter: " + "'" + meter['name'] + "'") + #################################################################################################################### + # Step 1: Determine the start datetime and end datetime + #################################################################################################################### + cnx_energy_db = None + cursor_energy_db = None + try: + cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) + cursor_energy_db = cnx_energy_db.cursor() + except Exception as e: + error_string = "Error in step 1.1 of meter.worker " + str(e) + " for '" + meter['name'] + "'" + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + print(error_string) + return error_string + + # get the initial start datetime from config file in case there is no energy data + start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') + start_datetime_utc = start_datetime_utc.replace(tzinfo=timezone.utc) + start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0) + + try: + query = (" SELECT MAX(start_datetime_utc) " + " FROM tbl_meter_hourly " + " WHERE meter_id = %s ") + cursor_energy_db.execute(query, (meter['id'],)) + row_datetime = cursor_energy_db.fetchone() + except Exception as e: + error_string = "Error in step 1.3 of meter.worker " + str(e) + " for '" + meter['name'] + "'" + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + print(error_string) + return error_string + + if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): + start_datetime_utc = row_datetime[0].replace(tzinfo=timezone.utc) + # replace second and microsecond with 0 + # NOTE: DO NOT replace minute in case of calculating in half hourly + start_datetime_utc = start_datetime_utc.replace(second=0, microsecond=0) + # start from the next time slot + start_datetime_utc += timedelta(minutes=config.minutes_to_count) + + end_datetime_utc = datetime.utcnow().replace(tzinfo=timezone.utc) + # we should allow myems-cleaning service to take at most [minutes_to_clean] minutes to clean the data + end_datetime_utc -= timedelta(minutes=config.minutes_to_clean) + + time_difference = end_datetime_utc - start_datetime_utc + time_difference_in_minutes = time_difference / timedelta(minutes=1) + if time_difference_in_minutes < config.minutes_to_count: + error_string = "it's too early to calculate" + " for '" + meter['name'] + "'" + print(error_string) + return error_string + + # trim end_datetime_utc + trimmed_end_datetime_utc = start_datetime_utc + timedelta(minutes=config.minutes_to_count) + while trimmed_end_datetime_utc <= end_datetime_utc: + trimmed_end_datetime_utc += timedelta(minutes=config.minutes_to_count) + + end_datetime_utc = trimmed_end_datetime_utc - timedelta(minutes=config.minutes_to_count) + + if end_datetime_utc <= start_datetime_utc: + error_string = "it's too early to calculate" + " for '" + meter['name'] + "'" + print(error_string) + return error_string + + print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] + + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) + + #################################################################################################################### + # Step 2: Get raw data from historical database between start_datetime_utc and end_datetime_utc + #################################################################################################################### + + cnx_historical_db = None + cursor_historical_db = None + try: + cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) + cursor_historical_db = cnx_historical_db.cursor() + except Exception as e: + error_string = "Error in step 1.2 of meter.worker " + str(e) + " for '" + meter['name'] + "'" + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + + print(error_string) + return error_string + + # query latest record before start_datetime_utc + energy_value_just_before_start = dict() + try: + query = (" SELECT utc_date_time, actual_value " + " FROM tbl_energy_value " + " WHERE point_id = %s AND utc_date_time < %s AND is_bad = FALSE " + " ORDER BY utc_date_time DESC " + " LIMIT 1 ") + cursor_historical_db.execute(query, (meter['point_id'], start_datetime_utc,)) + row_energy_value_before_start = cursor_historical_db.fetchone() + + if row_energy_value_before_start is not None and len(row_energy_value_before_start) > 0: + energy_value_just_before_start = {"utc_date_time": row_energy_value_before_start[0], + "actual_value": row_energy_value_before_start[1]} + except Exception as e: + error_string = "Error in step 2.2 of meter.worker " + str(e) + " for '" + meter['name'] + "'" + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + + print(error_string) + return error_string + + # query energy values to be normalized + try: + query = (" SELECT utc_date_time, actual_value " + " FROM tbl_energy_value " + " WHERE point_id = %s AND utc_date_time >= %s AND utc_date_time < %s AND is_bad = FALSE " + " ORDER BY utc_date_time ") + cursor_historical_db.execute(query, (meter['point_id'], start_datetime_utc, end_datetime_utc)) + rows_energy_values = cursor_historical_db.fetchall() + except Exception as e: + error_string = "Error in step 2.3 of meter.worker " + str(e) + " for '" + meter['name'] + "'" + + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + + print(error_string) + return error_string + finally: + if cursor_historical_db: + cursor_historical_db.close() + if cnx_historical_db: + cnx_historical_db.close() + + #################################################################################################################### + # Step 3: Normalize energy values by minutes_to_count + #################################################################################################################### + + #################################################################################################################### + # special test case 1 (disconnected) + # id point_id utc_date_time actual_value + # '878152', '3315', '2016-12-05 23:58:46', '38312088' + # '878183', '3315', '2016-12-05 23:59:48', '38312088' + # '878205', '3315', '2016-12-06 06:14:49', '38315900' + # '878281', '3315', '2016-12-06 06:15:50', '38315928' + # '878357', '3315', '2016-12-06 06:16:52', '38315928' + #################################################################################################################### + + #################################################################################################################### + # special test case 2 (a new added used meter) + # id, point_id, utc_date_time, actual_value + # '19070111', '1734', '2017-03-27 02:36:07', '56842220.77297248' + # '19069943', '1734', '2017-03-27 02:35:04', '56842208.420127675' + # '19069775', '1734', '2017-03-27 02:34:01', '56842195.95270827' + # '19069608', '1734', '2017-03-27 02:32:58', '56842183.48610827' + # '19069439', '1734', '2017-03-27 02:31:53', '56842170.812365524' + # '19069270', '1734', '2017-03-27 02:30:48', '56842157.90797222' + # null, null, null, , null + + #################################################################################################################### + + #################################################################################################################### + # special test case 3 (hi_limit exceeded) + # id point_id utc_date_time actual_value + # '3230282', '3336', '2016-12-24 08:26:14', '999984.0625' + # '3230401', '3336', '2016-12-24 08:27:15', '999984.0625' + # '3230519', '3336', '2016-12-24 08:28:17', '999984.0625' + # '3230638', '3336', '2016-12-24 08:29:18', '20' + # '3230758', '3336', '2016-12-24 08:30:20', '20' + # '3230878', '3336', '2016-12-24 08:31:21', '20' + #################################################################################################################### + + #################################################################################################################### + # test case 4 (recovered from bad zeroes) + # id point_id utc_date_time actual_value is_bad + # 300366736 1003344 2019-03-14 02:03:20 1103860.625 + # 300366195 1003344 2019-03-14 02:02:19 1103845 + # 300365654 1003344 2019-03-14 02:01:19 1103825.5 + # 300365106 1003344 2019-03-14 02:00:18 1103804.25 + # 300364562 1003344 2019-03-14 01:59:17 1103785.625 + # 300364021 1003344 2019-03-14 01:58:17 1103770.875 + # 300363478 1003344 2019-03-14 01:57:16 1103755.125 + # 300362936 1003344 2019-03-14 01:56:16 1103739.375 + # 300362393 1003344 2019-03-14 01:55:15 1103720.625 + # 300361851 1003344 2019-03-14 01:54:15 1103698.125 + # 300361305 1003344 2019-03-14 01:53:14 1103674.75 + # 300360764 1003344 2019-03-14 01:52:14 1103649 + # 300360221 1003344 2019-03-14 01:51:13 1103628.25 + # 300359676 1003344 2019-03-14 01:50:13 1103608.625 + # 300359133 1003344 2019-03-14 01:49:12 1103586.75 + # 300358592 1003344 2019-03-14 01:48:12 1103564 + # 300358050 1003344 2019-03-14 01:47:11 1103542 + # 300357509 1003344 2019-03-14 01:46:11 1103520.625 + # 300356966 1003344 2019-03-14 01:45:10 1103499.375 + # 300356509 1003344 2019-03-14 01:44:10 1103478.25 + # 300355964 1003344 2019-03-14 01:43:09 1103456.25 + # 300355419 1003344 2019-03-14 01:42:09 1103435.5 + # 300354878 1003344 2019-03-14 01:41:08 1103414.625 + # 300354335 1003344 2019-03-14 01:40:08 1103391.875 + # 300353793 1003344 2019-03-14 01:39:07 1103373 + # 300353248 1003344 2019-03-14 01:38:07 1103349 + # 300352705 1003344 2019-03-14 01:37:06 1103325.75 + # 300352163 1003344 2019-03-14 01:36:06 0 1 + # 300351621 1003344 2019-03-14 01:35:05 0 1 + # 300351080 1003344 2019-03-14 01:34:05 0 1 + # 300350532 1003344 2019-03-14 01:33:04 0 1 + # 300349988 1003344 2019-03-14 01:32:04 0 1 + # 300349446 1003344 2019-03-14 01:31:03 0 1 + # 300348903 1003344 2019-03-14 01:30:02 0 1 + # 300348359 1003344 2019-03-14 01:29:02 0 1 + # 300347819 1003344 2019-03-14 01:28:01 0 1 + # 300347277 1003344 2019-03-14 01:27:01 0 1 + # 300346733 1003344 2019-03-14 01:26:00 0 1 + # 300346191 1003344 2019-03-14 01:25:00 0 1 + #################################################################################################################### + + normalized_values = list() + if rows_energy_values is None or len(rows_energy_values) == 0: + # NOTE: there isn't any value to be normalized + # that means the meter is offline or all values are bad + current_datetime_utc = start_datetime_utc + while current_datetime_utc < end_datetime_utc: + normalized_values.append({'start_datetime_utc': current_datetime_utc, 'actual_value': Decimal(0.0)}) + current_datetime_utc += timedelta(minutes=config.minutes_to_count) + else: + maximum = Decimal(0.0) + if energy_value_just_before_start is not None and \ + len(energy_value_just_before_start) > 0 and \ + energy_value_just_before_start['actual_value'] > Decimal(0.0): + maximum = energy_value_just_before_start['actual_value'] + + current_datetime_utc = start_datetime_utc + while current_datetime_utc < end_datetime_utc: + initial_maximum = maximum + # get all energy values in current time slot + current_energy_values = list() + while len(rows_energy_values) > 0: + row_energy_value = rows_energy_values.pop(0) + energy_value_datetime = row_energy_value[0].replace(tzinfo=timezone.utc) + if energy_value_datetime < current_datetime_utc + timedelta(minutes=config.minutes_to_count): + current_energy_values.append(row_energy_value) + else: + rows_energy_values.insert(0, row_energy_value) + break + + # get the energy increment one by one in current time slot + increment = Decimal(0.0) + # maximum should be equal to the maximum value of last time here + for index in range(len(current_energy_values)): + current_energy_value = current_energy_values[index] + if maximum < current_energy_value[1]: + increment += current_energy_value[1] - maximum + maximum = current_energy_value[1] + + # omit huge initial value for a new meter + # or omit huge value for a recovered meter with zero values during failure + # NOTE: this method may cause the lose of energy consumption in this time slot + if initial_maximum <= Decimal(0.1): + increment = Decimal(0.0) + + # check with hourly low limit + if increment < meter['hourly_low_limit']: + increment = Decimal(0.0) + + # check with hourly high limit + # NOTE: this method may cause the lose of energy consumption in this time slot + if increment > meter['hourly_high_limit']: + increment = Decimal(0.0) + + meta_data = {'start_datetime_utc': current_datetime_utc, + 'actual_value': increment} + # append mete_data + normalized_values.append(meta_data) + current_datetime_utc += timedelta(minutes=config.minutes_to_count) + + #################################################################################################################### + # Step 4: Insert into energy database + #################################################################################################################### + if len(normalized_values) > 0: + try: + add_values = (" INSERT INTO tbl_meter_hourly (meter_id, start_datetime_utc, actual_value) " + " VALUES ") + + for meta_data in normalized_values: + add_values += " (" + str(meter['id']) + "," + add_values += "'" + meta_data['start_datetime_utc'].isoformat()[0:19] + "'," + add_values += str(meta_data['actual_value']) + "), " + # trim ", " at the end of string and then execute + cursor_energy_db.execute(add_values[:-2]) + cnx_energy_db.commit() + except Exception as e: + error_string = "Error in step 4.1 of meter.worker " + str(e) + " for '" + meter['name'] + "'" + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + + print(error_string) + return error_string + + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + + print("End of processing meter: " + "'" + meter['name'] + "'") + return None diff --git a/myems-normalization/myems-normalization.service b/myems-normalization/myems-normalization.service new file mode 100644 index 00000000..88b59261 --- /dev/null +++ b/myems-normalization/myems-normalization.service @@ -0,0 +1,15 @@ +[Unit] +Description=myems-normalization daemon +After=network.target + +[Service] +User=root +Group=root +ExecStart=/usr/bin/python3 /myems-normalization/main.py +ExecReload=/bin/kill -s HUP $MAINPID +ExecStop=/bin/kill -s TERM $MAINPID +PrivateTmp=true +Restart=always + +[Install] +WantedBy=multi-user.target \ No newline at end of file diff --git a/myems-normalization/offline_meter_data.xlsx b/myems-normalization/offline_meter_data.xlsx new file mode 100644 index 0000000000000000000000000000000000000000..b99f1121b43b20e830979bce641f3bcb6daa95d2 GIT binary patch literal 10766 zcmaJ{1z23kvc}ylxDSJSa0u@1?wVkO2MGjsch>*`f@^S>;O-VAxNCq1n|tr>=I*=w zeKTjIs;kf6eX6UgenlBbCDYG+XM^RA#KxAb;=FYPI6gh&C;wFPS@tcQW%}@(WpWi%sY_#J zSh(KsD2xQXkTQoxL3MQHYtppA2YwdB5fM_zxX)F*QWKZ{o4fzLz>7IrU-Q`jX}gb zKM@T@ho%WdgcZ&n?jrM_vMD~wQ|!e;aftA#U%9-$#S#RJO_`UfcTmezwuU$dsV5QB z^hebvT@O!Q-Kw4Jy1JGfNZc~61mGrj@;JH}Xkc7)Q;7@Iv~jQ&@QP3#=ZUO@JWHIV6I#_Cyx zZW9^sNUI585m{U(x%w_xk$Zy^`;}pV5)k5YfA#6GlqfVhDb4Zh>yEy1Daz*%fes!k zr;rK%Y&dpK;XS>O%suC3(U@~t#2H*<0FSRo$=+G+8pBp^r{8M^O{Na9kNHGfHX&Cl z^;jl-*V)N3z9O)occsPx@h(9+y{q#m_Tv2zbSJXRqxPubiy^`Ck{v+Pd!saPQm#N4 zEX!m2)cl*i{SMme9Y3KI?u3gzj&Dw(ycUQ8p>g+X-%Ssi7=N6RiT7u1Bn>}oGQXvF zZlgmfqJ=74f;jJ0pB0v3$f);HTT7X+t27$1>+Gz@=pL?gXY)7=XFRc^VGcqUXAY+J z&)tMu9d!KQp;4}s5W^leK%SneJD#p6tbadZaeN=W8NhS-l$0Ek{2kO(IuQE_QF;RJ zlt2-=arBM&CG_8ck6Sn*dwd4m`x$)9zX9*;=wxT}$NG*IQt)HO4LqiL4El6EfzwHt zUjb+R3N@9>6DLh=@@x(&F2N2NK|jLf>ApruB8*tJPy<>sp#?ND}p))docC@JcfY`ggqN{0D6M%Xb|W_+f@(PQz#z=UzA(Yh$C!@s{rS}_rzVz zs0RR~e5oTk1C_)#k-NO{y4nx`T_uaojb{WeaQ(B(fP99_&H4{#@y`M%zXV+E9Nsxv znwdHM?pSm$<==9`f`JtigMs1xm+T)ZEH4v~kg}+}$%PhpT>o~pW9#0T{#F+Klc+G4 z2x;~E_7ifmV)I9UUqJ=AvG*Mb;>`CiwZPQ{^?bGHgtFnMECGw`4fm!Mo+K#vQ zqn*<_1}$v_51XH*Q*TRBNP0@L%O6pcFZ()Hy$|g+zFRcG`O1DIR<|J3HY!w8ACnu+R$0@jK=`99NmRD z+!U(axG&fY_~)3^4bv}Bt}MvhSPa|&@Y`N$75t!$&!kxWBDmHQBd&mR1E3I=Y_%aw zWiIV|q~_k!Q9`}|D*oeUurtO7E1VBdex#vydp@AxJnt~2vtwq_a=g6z7GgWq?qMz^ z^NuC&s45{s{=jPMaSi@j+piIvK%twu{yW;)*A*7p->Pi=0&n=kI@RpaJ)zLYyru^v zGJDaHAlnkml3`mu(?1w&`KOXi9JoTQkI=SK8lgbLdvgK?lFfevPI{i17BiNX7MJ!C z;i*ZS)b9oB;BO|*dbZBTXf2|=*SUmxEViazT9U6$0eV^u@8CG@D_l>Me;a*O?WDCI za-|J<8$Ru5R=4ik__gdH;hnO-C~*Poy;QRjsb7bB5yQz>-9=V<9^{N3`PmQpL35c# z!i4hYC1UTnkq}v8h4=eDwyxg$_@t?KuXA)Tv&>O(Q3Wpge+;3ZVTr})Q{E7jPL~d0 zqX97j81uUe;u~TyW|fWU0k-+wW%0?9A-psoCIE4McV4_(EXJtvhM06Pk(9)Q&_O>> z=z%0}X#cref@g{cKef4>m|tw8bJl`dlaJq9b8NA&{cl~^TNUw#!shv; z$kO9K;`uQR1nA-Gn+e}Z=tW%c?Y%YSkNk>{cbs3M<<>m;#oXkuw1iue_xLU23}X{M zXxU7|;zZ zU6}lIjP3cGezYVXC6sbVm?-wKS-Zmqg%Y%90R|8r>mc1tYRw*Ki$T4{eHtf7W``yL zk98rwPos)r=%H$Dh#aA_T8d;3uL@(QaYUSm?{{cey-7BLuL_5C217i&zlUy&9B1X?*JK(Y5ezuud6ec8A`eocZ@DGI7SoD3v&9>#2HYDb7YuG0!1HU-^U*^6ATIFXbqi-wH*d(}j=dQ3aFw9(rY@nPtQ& z&pXFDw{$n*QfO4~L+SVV9DEcrHEuwJAHzSPD-&VS7r!l$5o1AgaC|F!Jx4(Y+;dkn zugddI$jJSI1hDq7oi)V>hM0wmd!NyKO&uQQkNjh;P8<`6s0$!`C}GBFcm1zAg%8DDlcBq0~i; z=$ph{AjX4gZkE)AW5@@xaVEfBf@oXfu1UxTig7!@ooMc=)P+aLBk4E^;4V(oJ8_o@ zLk|v5acA5Dk93dz4l8;qIvNlYZ59(<5My~BtWGp{g00r~__%_~X!RMTxGJb1@H0vy z4Kghh;i@Xp9Ta+5;T@>&@d5iZ^PWF&MDn3%5v^qJ6YjnsjM==@Km@r(01Vt85 z%v~m?7)-bodU2+KXu&7+cXM=m7qsV{TTVE+@?UsBx~W_cg*cr2Jg6TXhsL-gq(8=m zvB{HIL5(m<8j9nH*8C8oi+$*!D5~tk<3q#YU6%pNK}jgTJLf#qs98_t2)F7&wrMCp z#$2$MAUNVhwiJOqZWZtvGCyD`Oo8)pBmb9>Tp>6fxJ6+We#iA1R>9e}*oS_EAkCgi z%}NFa)V{&E=-MY7-Mayu2I&OqWRdlm*RZ*-X`)&Oy{p{x(nFhj@dz)l4PM#L;kQrh z;`0?dd8?Wc_s!eFLy(i$VVEAjGdnEzGsSrP=*P^p5)y|NsT&qI)7lgnM}OLdbYwX8 z2!tWS{~`jmJ`I@;v{0{PWog7p^Nx))+I}_*n`i6)6!P>a^913#EEiF#V=0<$d*Z{uV`Fw&UJmghN zSV`J+vEOpNgXH2+;PtKLOKLvpEr+szx_{Awq zJaZ}$V|J}bP?xVebO`rXB}6;O<`(Vt9qQmf+9vj}WQ(_2r3E(vuSCEsNxMSLC5Ig_ zHqmk$q9!Y|HQCyApm-Gyl&N1OvNKVN2sut8uZ*ymcBijY7q0So+~M+j-0fa;#7)Ak zW_>v6_~Ca<&tSKSoG!N%(dK=5N@CC)^}*xk)-mdvvObwtPm}c|k2hVwOTqr^M$tFk zSG!5Ts1J`n^9yxE*v#umdn!L?Xup6ofcUa19(-ahU2%;tlr>+u_H)knt;q zzr@Z4O+R^;3BnJv8L*Fwc$QL70&AEvzY`I=r${HhkFq4+&F4zXOd6u%>j zGw2?dWcu`I^PA(LVXDY9z28K&*Bi3TWDg_@1O-WPtclSZGP2hT%Xw4P`;5rYNqp+N z#A5ui@8no>O_-v$(`?7EezKNK;^`jFn_n~Tfz%5xQMk`yqS2D4;ogvcW7~A*Q{yV5 z;O?YxME-zSk}u!ymjb7^BuIO5;m;figC0|5J8A=d$0OeslA}M}qFqK#|h*}V$(e%AoFXM0o>782ylW= z@FK3b=yc1uv*m_Axe^ys#VkADBY~%2-XOt7JjYfCCCTh7fwc%`iA*MeMd&5#l6QsZ z@tReg1SuVk&Xid|Yw1`Aok330)>+T&)_a;j+A>%S-gjvOym~xteeso$uT8!uvRBvp z!(bsBDIK!~$tx3epFuG?Q$a$aa-z>22>F({7E7aGq<1Z?z_G`K&`T;%4mSFq=W2Z^ zC_81or}J2_{^1y0Mu<_OM?Amf+@ed&Ny{q!IfQ#MF?{Cr$8%PlRr+9GTc)nkoB$rq z`atwyEU_q^f+~(m?j*kfx@p78Dk@nHxNWkEGZ@1A*YCKF4aLcAT=(-P70HAqF-cq{_@CtlabJ(^9w{0kT~< zh_tq;fJ0*-&fS_BZEyt83bHX5ZI@Z9-)sRf6-!$C{U=3uf~!$E^SCWXI~VDmpOlvE zi>U9ep#xfYq7jQ-DrBuKlO@F9Dr6n2&dO`sOJ5ZT%Dv{v!rHfi3-p#lsK`6k#^|F| z{ZgHQZNuxK6R#|wCQ3*-&VpZEJlD7Of#`F#f+(Q*lo?xSwSAx4#^$`AX+&(Ik*No1 z@s^fR96zyREZ8VK?V-JVA3$JlTwB;Zv2ZQx;$SfPzPO zkOcTf^Ha-E-DQfDDqS$j#A5r)igIrZE`{<_N%%)NAuo-!@C(BGHx*Nf;W0NjJERR)dncZ%G!+$$x3RCct#~LPe;xQq3+Ku`6vrF5&{*}H4`h%f>`Rk!l zFmrjjT`a4oSn_7Wl3MI{LYuYfG=y#-B@@k3m|Q}AO0oyYhp5o#c~s)2oX8u2DRrn- zCUzD$g5W*QgSC%NA8W!)aMKTRk*VYcQ|oXuYNiLxy*lD-d#=MPYfX1fuMzRT;*WT` z=rii)S0W;M)YZL{kwebpPs^>qN`l4P#62(g_K@7ucil10dopi0c(B8e(v>mb=c$l= z%0fuKW(dIqY@~4jR!Ux~5clnN$2Vvxtz`#gP-Ar(Lk2==G6d~8$Z#bfti-1r-oSkh z4Ox~_)nCtVMo_3aQ^o>WnxK8?pgcj$v1ksjcN)`Tw%o7M#Bi(A(co5;pME{uGP`Jn zJfwLxDrI}oa4uN5N`SP)VB4g&6Xnoa@r8bCO1*YdKW}=)v-tfqwwYtgB!Sz{izk?DD1^e|j)u@>*uCf1pESWtuGC>H*2F!^kh6Re8plD3`r$)iRw&Et zAEnJ8P;aDoiL}`?K*Lo-VD)8(tnCH`U`!kMtOwRgK>(GT@<;f;=R2@>V4y!dAJ8t8DY7h z=DTPm0Ssof_Y@m4P}5(AvmqtfyDimOUXDWkz|hLWXifH;2av-c^Xbvm0_1SaeQZYr zWXTRA%}lPBs`{{U*c+zPSHGEOe+YFlL>unO`*Csi6G1sepMI}@|8tp4QuYTiU2u(Eq|xn`x(CA;-~DO1qK0gBdQ9L3vFWG&BEn6k4Db0T$%tzgG?hi{BtEB_@SGJ#eg&*+ zKlEa-^?}>@1$hi0g1Nnk>8!@}JKSPOw|P=8SFNw1Gn_GAt3kH!FuQYh7(=i)RkX)U zFTtCCQZl}O+P#Rpn{ne4sFg2W33-h_1~Sa7#R0j0Lr~5a(+g1p7ftd)NTcFnq37r* z=HAz0XHFd($PFojmxoMcOI z!Nb{3Fcf{;?H(a^RMdzh13ODq%4tncfQ&EFIIa8LmQbOl$nUc~vA9g~dyXn818T3E z>T|YHEdUx{jEAGlBh`AUzj88V4(cnJ*;aVSMZY1&eX&peH1Co#GLr`wGW5)`%QC2p zIAEs$%N8vfbp@=7=(K6R$-0Asb+RdM^#mm{u-%?otDRIx==+ z!|={PgA?y|3(jm>LxW?;}yRCKQ=~Yz3y>wY1f?4T$eiD`G|)^cYR7vKQ|D;SajKLhk}Z1O1&t z616el=Sk|-BoAx_z3y4ydBGNFC>^fBN*6wm=@4+i#W8<$086wk`)c$_FPtkDKt_@f z&c%j%f7Ve@dHSi=M>J>ojXz5zs;!;HV@J-iwhTF$4zI0=WF3cXG?~${pbnYT71mMC zoW$U@5w9ulCVE#>MN`Ees*JsH_zu zHK|IJG24&At>x)J`k8E3{$Yz-|o5@y;(c<HMIFJ@;I&N4jbrVNfg)I5vR%NQ=4x^_6s(NQ+|@R+ME$;>In_A>4UI zXCy;pEV-TE1pqC+rNu%p_j%Z0QrkhbEKs|i(k147QFE$~LXn)xsn(PBBKn&(j7+bT zQSxB|K`x31YiRbQGXP`rEy9#N0VqMk7$rTVsPm>sS&lb0E93)vYkQ_u5Pf80NeGyV zh25F=L<&lkP-k26jMpH@;*e%8;`FGn6_=s1dC`UAjUKPMsVMc^_2oP_ZestPh5#9O za@P_f`s<{8@s&4WQFvZE0o&}_in=EfZeCdjE!DR;V4ED_OC&yohr(k=GMZsT5P`24 z1!6_&o!Xr1t=oh#5q3G0*lFmys5vjh>sEty_b9FL6p6|1^o{)V;R_ldC}uT+EH=vc zR5V(YFGwQsc_We_&oO8PQ1Gn9$|s(#16|vclvkv7^YDfo6bGvP`7{PLZ%0`1<`L`! z2$v&{DLlNkmO5&Cp)*_WKE zOO{Dx&3)=qJ21DIechfib@lF&3GesZ?UR$Rm&VGZ?U(NJ##!_k`l)o7k?F`tr_hKN z9ZE5@d4!b>w~!>*ja7xep`-P1#n$(yj~N?4t`UT>En|_96`NqVk4bC=1Yk9LQ1HvJ!Im|)9-A;H#wM~IG`b92#s2R*!fRm+H6>3RV3_f!Mas@AXmY&7X($3R zv?lkH9OH6Wr`!pLQXJzdS*M!7O#p@Q9i6T7`AKYrF(B5dJCq@sy;nSJ@gC6`hpLU0 zONQUg$F+&xQQQ?0geyKBK*BHFe6zxGvTST|o}e7W#L+qt-VKN{wL%L3#Ww~{mcFJ+L}du9REuaui53Qvk_Gpf zpuWKN#6fLsuz=G+HREfS_eRF{Y%q&)=itsJ`yMymK2lwD_5eVFCL|RAoYpjEgaHp= zZoyQ9u=ct=yy~wG+T}ZUS4b1`(Ub$Bd6i2M2HqoNi(JLJy#-_hw%QS_UbSL2n_60i zFtZERSQ{qhfWvyQ)9N?E6EX}@(?b(1qz~kTnbnvETUKAZIix0q>u{kQQlciEFBDPb z9T6TyYzqQEpu4G%{@!6o^*y07HvsqAS902P5_!8AJJDUJBBnBT_;6Ck;#3LWm;sOC zfY<1=Ef*Prb!QTHh5s6M#O4Y&I?Yf2a8~J>Ob^R9cx3D_XYWm@^)Lxgh$k?Ob6uc= z=`#{C+y@eFK*rm%-UWs4I~KG`T#pKwbM$1t{k%iw#ztOzBg6t%!pTBt7T6|^--vnM zH~R{>IW$`>v{T~>TBr;hDPkkm({&M_x!5>!;5b5dpUe^josEy|xO(k(1o3*gq6dH) zq$6?c=Xi9>mIBUCCNnmVlD(TTdEOIyjL{Cp5Bqy(y-7>n4GdApKlYtF@ct~yi9>?T z|AhSaK=lbB=Cl8EQ7-AZ%=A)=eojO=S{gZ+nW{KBSlL?q5wC8=jlL8``;L8|MEaKt zrpd@{f@Fg>P&P&#mp0U`;!;d=^#i!Ep+MO|L~2WBN6hHUQ{KIGUvHU`znEB}^SQD6 zx8FBkpAoM%;}-MJf`9c#OjE+#gGYuCcBHqP^o5^QXERYjgA1`D<_*WdJi-K5nb{2{ z^M|5+c6vAk^l}uG4q_X|F@^x)1dA}xD;cvh2n>L{y*?Iv%oo4j*ESf9bDz+@Yop8D%o9b{x8M9qD59U5vbT*`3YG^DNgs9aRu$@)J;%Y)G0SLuqGpiI2w@O^h)? z+4OnEA(uR0`G9SlX{0I+sx~DkPwGJUl$)j4tsDJOAxkn4>5?gNW~0}0YMx~9{qW>< zQrIf3ms3CE-!Z^D)<6=t%#jX$B~L*ZtrpbKY+#V|ozZo}5W z_+^F9Z2q*DjK<`m2y1 zQzUO%BagG(LS~u)2CvKw6tw-prWtqHOTWx7a*YxaEdaW!dNdZjlo|9?&w<{51R z2Q%y6i>w}8F3nHR|E)eV3lRS!!}uau7CS24&5YK2?0by1-74pbjXv+hM1CM=>gAIx z`!q&{8LM^J+9ISxKh*>qm2$hCw(4p6m0+1vpXF0zv#+y~ub^GOORLg}r<>-ouViV< zG~|0#`%b8@duPV?{(jqh@e7QUF?ycLW2`~EG;vnrKU5V3vv^;N6>`5Zq448qu~oDk zJQMHDB36{+>sg@<_V@||;tFWKwX^%YKeEQ3k|#3CM)zGcCd1h}8av$2GjFa9?NST| zhI`b(SzzecRXV7&%$xyPOgA7|C3I;^w+%#}(x)p=5(>y3z7?FD-0AFy8P};9lH~CR z^Ob}=LM9{aWUjeCJ6B3Q&T^n0^s%RInL=)5hBVP$mE>a+TDw6dL&bdj?B+m3-l zi_4A2Q>avLO_&u;xv7yD?>MGlT2YQsnX-dzPU^zIk;5e`=#G!;NanN>Wu|H%X+ZF% z*+#_rwN$f)b~Is-Fh5jTE)&H*X`{{%*+jU3by7hkOiF5CkZgFw;WpIS`R zYEf2m?~hnc65!+_=%idz9(0iG=>mD zAH&n6cn)u$56dk`;&ksq0ishY)1?NNn&9&81xDV3t(vyZF-r>i&vCmfdbsv#d|9&tl+U^H*H^Jw8^Y>`=0PSHl@Amr;% zrI)46)VT22r1TY1=d;DBD@_eS3r{4EQE7GB^HH{i$5z{ZJ-6$_Y3g^H=kK!Z+Fo}Q zksG2d#Dc@pf*`}uaOObhB`6>GxlJu{?qaeyQ4k+d23>Wmu2eALBPAIX@|(;jQyG$V zha1Mcx;x_yh`9PBHdOpXt!>$xL3?XasLM?949Nr8J~Sa}Yp~eY)0tOCMTB83moYOi zkIRBM{6g|h#XdWBZH{1C+s|o`zT3j<^O1_K8k);Ngu{`w9R}eAug^bLE?Z;j+UT3l zJI+lwem?sPa0skF_Rn8juHR?oc?E-kgONVJYf*xM{lh%{>foRG9WO5DS9_s_b&&n( zfc|ducNg@c^ULVge{}wMWd4`2muK=;eC(_AmO{?-2b? z#rjh_?0HH5$LPO_S--3QPAK|Q9qKuf`BVKr^rHWD(=Qa8U+v|l*`WSKxOutVf1Z-R zql8TH7YFtGK!5jSFLr*R#Qs;c|MF|U_w#$8_GdpC&%B<03fq3~;rE>6pFLbWQ?g!q z_)muNe}VP)dp~^M?4DKsUE}=w3i-Xt`KLuJq`#>BS@ZmT$iK%qe|Dfk`bP(Uh3$V0 z`h_F)tG&#y>aQ988)NG4BmbQ<^`iT0)_nd;_fH0uq73vO4=egJxe)eQ?iBE%_kSym BjfDUJ literal 0 HcmV?d00001 diff --git a/myems-normalization/offlinemeter.py b/myems-normalization/offlinemeter.py new file mode 100644 index 00000000..631b10ba --- /dev/null +++ b/myems-normalization/offlinemeter.py @@ -0,0 +1,302 @@ +import time +from datetime import datetime, timedelta +import mysql.connector +from openpyxl import load_workbook +import config + +################################################################################################################ +# PROCEDURES: +# STEP 1: get all 'new' offline meter files +# STEP 2: for each new files, iterate all rows and read cell's value and store data to energy data list +# STEP 3: insert or update energy data to table offline meter hourly in energy database +# STEP 4: update file status to 'done' or 'error' +################################################################################################################ + + +def calculate_hourly(logger): + while True: + # outer loop to reconnect server if there is a connection error + ################################################################################################################ + # STEP 1: get all 'new' offline meter files + ################################################################################################################ + cnx = None + cursor = None + try: + cnx = mysql.connector.connect(**config.myems_historical_db) + cursor = cnx.cursor() + except Exception as e: + logger.error("Error in step 1.1 of offline meter.calculate_hourly " + str(e)) + if cursor: + cursor.close() + if cnx: + cnx.close() + # sleep several minutes and continue the outer loop to reconnect the database + print("Could not connect the MyEMS Historical Database, and go to sleep 60 seconds...") + time.sleep(60) + continue + + print("Connected to MyEMS Historical Database") + + print("Getting all new offline meter files") + try: + query = (" SELECT id, file_name, file_object " + " FROM tbl_offline_meter_files " + " WHERE status = 'new' " + " ORDER BY id ") + + cursor.execute(query, ) + rows_files = cursor.fetchall() + except Exception as e: + logger.error("Error in step 1.2 of offline meter.calculate_hourly " + str(e)) + time.sleep(60) + continue + finally: + if cursor: + cursor.close() + if cnx: + cnx.close() + + excel_file_list = list() + if rows_files is not None and len(rows_files) > 0: + for row_file in rows_files: + excel_file_list.append({"id": row_file[0], + "name": row_file[1], + "file_object": row_file[2]}) + else: + print("there isn't any new files found, and go to sleep 60 seconds...") + time.sleep(60) + continue + + ################################################################################################################ + # STEP 2: for each new files, dump file object to local file and then load workbook from the local file + ################################################################################################################ + for excel_file in excel_file_list: + print("read data from each offline meter file" + excel_file['name']) + is_valid_file = True + fw = None + try: + fw = open("myems-normalization.blob", 'wb') + fw.write(excel_file['file_object']) + fw.close() + except Exception as e: + logger.error("Error in step 2.1 of offline meter.calculate_hourly " + str(e)) + if fw: + fw.close() + # mark as invalid file + is_valid_file = False + + fr = None + wb = None + try: + fr = open("myems-normalization.blob", 'rb') + wb = load_workbook(fr, data_only=True) + fr.close() + except Exception as e: + logger.error("Error in step 2.2 of offline meter.calculate_hourly " + str(e)) + if fr: + fr.close() + # mark as invalid file + is_valid_file = False + + energy_data_list = list() + # grab the active worksheet + + if is_valid_file: + ws = wb.active + + # get timezone offset in minutes, this value will be returned to client + timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6]) + if config.utc_offset[0] == '-': + timezone_offset = -timezone_offset + + for row in ws.iter_rows(min_row=3, max_row=1024, min_col=1, max_col=34): + offline_meter_data = dict() + offline_meter_data['offline_meter_id'] = None + offline_meter_data['offline_meter_name'] = None + offline_meter_data['data'] = dict() + col_num = 0 + + for cell in row: + col_num += 1 + print(cell.value) + if col_num == 1: + # get offline meter ID + if cell.value is not None: + offline_meter_data['offline_meter_id'] = cell.value + else: + break + elif col_num == 2: + # get offline meter name + if cell.value is None: + break + else: + offline_meter_data['offline_meter_name'] = cell.value + elif col_num > 3: + # get date of the cell + try: + offline_datetime = datetime(year=ws['A2'].value, + month=ws['B2'].value, + day=col_num - 3) + except ValueError: + # invalid date and go to next cell in this row until reach max_col + continue + + offline_datetime_utc = offline_datetime - timedelta(minutes=timezone_offset) + + if cell.value is None: + # if the cell is empty then stop at that day + break + else: + offline_meter_data['data'][offline_datetime_utc] = cell.value + + if len(offline_meter_data['data']) > 0: + print("offline_meter_data:" + str(offline_meter_data)) + energy_data_list.append(offline_meter_data) + + ############################################################################################################ + # STEP 3: insert or update energy data to table offline meter hourly in energy database + ############################################################################################################ + print("to valid offline meter id in excel file...") + if len(energy_data_list) == 0: + print("Could not find any offline meters in the file...") + print("and go to process the next file...") + is_valid_file = False + else: + try: + cnx = mysql.connector.connect(**config.myems_system_db) + cursor = cnx.cursor() + except Exception as e: + logger.error("Error in step 3.1 of offlinemeter.calculate_hourly " + str(e)) + if cursor: + cursor.close() + if cnx: + cnx.close() + time.sleep(60) + continue + + try: + cursor.execute(" SELECT id, name " + " FROM tbl_offline_meters ") + rows_offline_meters = cursor.fetchall() + except Exception as e: + logger.error("Error in step 3.2 of offlinemeter.calculate_hourly " + str(e)) + time.sleep(60) + continue + finally: + if cursor: + cursor.close() + if cnx: + cnx.close() + + if rows_offline_meters is None or len(rows_offline_meters) == 0: + print("Could not find any offline meters in the MyEMS System Database...") + time.sleep(60) + continue + else: + offline_meter_id_set = set() + for row_offline_meter in rows_offline_meters: + # valid offline meter id in excel file + offline_meter_id_set.add(row_offline_meter[0]) + + for energy_data_item in energy_data_list: + if energy_data_item['offline_meter_id'] not in offline_meter_id_set: + is_valid_file = False + break + + if is_valid_file: + #################################################################################################### + # delete possibly exists offline meter hourly data in myems energy database, + # and then insert new offline meter hourly data + #################################################################################################### + try: + cnx = mysql.connector.connect(**config.myems_energy_db) + cursor = cnx.cursor() + except Exception as e: + logger.error("Error in step 3.2 of offlinemeter.calculate_hourly " + str(e)) + if cursor: + cursor.close() + if cnx: + cnx.close() + time.sleep(60) + continue + + try: + for energy_data_item in energy_data_list: + offline_meter_id = energy_data_item['offline_meter_id'] + print(energy_data_item['data'].items()) + for k, v in energy_data_item['data'].items(): + start_datetime_utc = k + end_datetime_utc = start_datetime_utc + timedelta(hours=24) + actual_value = v / (24 * 60 / config.minutes_to_count) + cursor.execute(" DELETE FROM tbl_offline_meter_hourly " + " WHERE offline_meter_id = %s " + " AND start_datetime_utc >= %s " + " AND start_datetime_utc < %s ", + (offline_meter_id, + start_datetime_utc.isoformat()[0:19], + end_datetime_utc.isoformat()[0:19])) + cnx.commit() + # todo: check with hourly low limit and hourly high limit + add_values = (" INSERT INTO tbl_offline_meter_hourly " + " (offline_meter_id, start_datetime_utc, actual_value) " + " VALUES ") + + while start_datetime_utc < end_datetime_utc: + add_values += " (" + str(offline_meter_id) + "," + add_values += "'" + start_datetime_utc.isoformat()[0:19] + "'," + add_values += str(actual_value) + "), " + start_datetime_utc += timedelta(minutes=config.minutes_to_count) + + print("add_values:" + add_values) + # trim ", " at the end of string and then execute + cursor.execute(add_values[:-2]) + cnx.commit() + except Exception as e: + logger.error("Error in step 3.3 of offlinemeter.calculate_hourly " + str(e)) + time.sleep(60) + continue + finally: + if cursor: + cursor.close() + if cnx: + cnx.close() + + ############################################################################################################ + # STEP 4: update file status to 'done' or 'error' + ############################################################################################################ + print("to update offline meter file status to done...") + try: + cnx = mysql.connector.connect(**config.myems_historical_db) + cursor = cnx.cursor() + except Exception as e: + logger.error("Error in step 4.1 of offlinemeter.calculate_hourly " + str(e)) + if cursor: + cursor.close() + if cnx: + cnx.close() + time.sleep(60) + continue + + try: + update_row = (" UPDATE tbl_offline_meter_files " + " SET status = %s " + " WHERE id = %s ") + cursor.execute(update_row, ('done' if is_valid_file else 'error', excel_file['id'],)) + cnx.commit() + except Exception as e: + logger.error("Error in step 4.2 of offlinemeter.calculate_hourly " + str(e)) + time.sleep(60) + continue + finally: + if cursor: + cursor.close() + if cnx: + cnx.close() + + # end of for excel_file in excel_file_list + + print("go to sleep ...") + time.sleep(300) + print("wake from sleep, and go to work...") + # end of outer while + diff --git a/myems-normalization/virtualmeter.py b/myems-normalization/virtualmeter.py new file mode 100644 index 00000000..d8b5b21a --- /dev/null +++ b/myems-normalization/virtualmeter.py @@ -0,0 +1,475 @@ +import time +from datetime import datetime, timedelta +import mysql.connector +from sympy import sympify +from multiprocessing import Pool +import random +import config + + +######################################################################################################################## +# PROCEDURES: +# Step 1: Query all virtual meters +# Step 2: Create multiprocessing pool to call worker in parallel +######################################################################################################################## + +def calculate_hourly(logger): + + while True: + # outer loop to reconnect server if there is a connection error + cnx_system_db = None + cursor_system_db = None + try: + cnx_system_db = mysql.connector.connect(**config.myems_system_db) + cursor_system_db = cnx_system_db.cursor() + except Exception as e: + logger.error("Error in step 0 of virtual_meter.calculate_hourly " + str(e)) + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + # sleep and continue the outer loop to reconnect the database + time.sleep(60) + continue + + print("Connected to MyEMS System Database") + + virtual_meter_list = list() + try: + cursor_system_db.execute(" SELECT m.id, m.name, e.equation, e.id as expression_id " + " FROM tbl_virtual_meters m, tbl_expressions e " + " WHERE m.id = e.virtual_meter_id " + " ORDER BY m.id ") + rows_virtual_meters = cursor_system_db.fetchall() + + if rows_virtual_meters is None or len(rows_virtual_meters) == 0: + # sleep several minutes and continue the outer loop to reconnect the database + time.sleep(60) + continue + + for row in rows_virtual_meters: + meta_result = {"id": row[0], "name": row[1], "equation": row[2], "expression_id": row[3]} + virtual_meter_list.append(meta_result) + + except Exception as e: + logger.error("Error in step 1 of virtual meter calculate hourly " + str(e)) + # sleep and continue the outer loop to reconnect the database + time.sleep(60) + continue + finally: + if cursor_system_db: + cursor_system_db.close() + if cnx_system_db: + cnx_system_db.close() + + # shuffle the virtual meter list for randomly calculating the meter hourly value + random.shuffle(virtual_meter_list) + + print("Got all virtual meters in MyEMS System Database") + ################################################################################################################ + # Step 2: Create multiprocessing pool to call worker in parallel + ################################################################################################################ + p = Pool(processes=config.pool_size) + error_list = p.map(worker, virtual_meter_list) + p.close() + p.join() + + for error in error_list: + if error is not None and len(error) > 0: + logger.error(error) + + print("go to sleep ...") + time.sleep(60) + print("wake from sleep, and continue to work...") + + +######################################################################################################################## +# Step 1: get start datetime and end datetime +# Step 2: parse the expression and get all meters, virtual meters, offline meters associated with the expression +# Step 3: query energy consumption values from table meter hourly, virtual meter hourly and offline meter hourly +# Step 4: evaluate the equation with variables values from previous step and save to table virtual meter hourly +# returns the error string for logging or returns None +######################################################################################################################## + +def worker(virtual_meter): + cnx_energy_db = None + cursor_energy_db = None + + try: + cnx_energy_db = mysql.connector.connect(**config.myems_energy_db) + cursor_energy_db = cnx_energy_db.cursor() + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 1.1 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + print("Start to process virtual meter: " + "'" + virtual_meter['name']+"'") + + #################################################################################################################### + # step 1: get start datetime and end datetime + # get latest timestamp from energy database in tbl_virtual_meter_hourly + #################################################################################################################### + + try: + query = (" SELECT MAX(start_datetime_utc) " + " FROM tbl_virtual_meter_hourly " + " WHERE virtual_meter_id = %s ") + cursor_energy_db.execute(query, (virtual_meter['id'],)) + row_datetime = cursor_energy_db.fetchone() + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 1.2 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S') + start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None) + + if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime): + # replace second and microsecond with 0 + # note: do not replace minute in case of calculating in half hourly + start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None) + # start from the next time slot + start_datetime_utc += timedelta(minutes=config.minutes_to_count) + + end_datetime_utc = datetime.utcnow().replace() + end_datetime_utc = end_datetime_utc.replace(second=0, microsecond=0, tzinfo=None) + + time_difference = end_datetime_utc - start_datetime_utc + time_difference_in_minutes = time_difference / timedelta(minutes=1) + if time_difference_in_minutes < config.minutes_to_count: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "it's too early to calculate" + " for '" + virtual_meter['name'] + "'" + + # trim end_datetime_utc + trimmed_end_datetime_utc = start_datetime_utc + timedelta(minutes=config.minutes_to_count) + while trimmed_end_datetime_utc <= end_datetime_utc: + trimmed_end_datetime_utc += timedelta(minutes=config.minutes_to_count) + + end_datetime_utc = trimmed_end_datetime_utc - timedelta(minutes=config.minutes_to_count) + + if end_datetime_utc <= start_datetime_utc: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "it's too early to calculate" + " for '" + virtual_meter['name'] + "'" + + print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19] + + "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19]) + + ############################################################################################################ + # Step 2: parse the expression and get all meters, virtual meters, and + # offline meters associated with the expression + ############################################################################################################ + cnx_factory_db = None + cursor_factory_db = None + try: + cnx_factory_db = mysql.connector.connect(**config.myems_system_db) + cursor_factory_db = cnx_factory_db.cursor() + except Exception as e: + if cursor_factory_db: + cursor_factory_db.close() + if cnx_factory_db: + cnx_factory_db.close() + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 2.1 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + meter_list_in_expression = list() + virtual_meter_list_in_expression = list() + offline_meter_list_in_expression = list() + try: + ######################################################################################################## + # get all meters associated with the expression + ######################################################################################################## + + cursor_factory_db.execute(" SELECT m.id as meter_id, v.name as variable_name " + " FROM tbl_meters m, tbl_variables v " + " WHERE m.id = v.meter_id " + " AND v.meter_type = 'meter' " + " AND v.expression_id = %s ", + (virtual_meter['expression_id'], )) + rows = cursor_factory_db.fetchall() + if rows is not None and len(rows) > 0: + for row in rows: + meter_list_in_expression.append({"meter_id": row[0], "variable_name": row[1].lower()}) + + ######################################################################################################## + # get all virtual meters associated with the expression + ######################################################################################################## + + cursor_factory_db.execute(" SELECT m.id as virtual_meter_id, v.name as variable_name " + " FROM tbl_virtual_meters m, tbl_variables v " + " WHERE m.id = v.meter_id " + " AND v.meter_type = 'virtual_meter' " + " AND v.expression_id = %s ", + (virtual_meter['expression_id'],)) + rows = cursor_factory_db.fetchall() + if rows is not None and len(rows) > 0: + for row in rows: + virtual_meter_list_in_expression.append({"virtual_meter_id": row[0], + "variable_name": row[1].lower()}) + + ######################################################################################################## + # get all offline meters associated with the expression + ######################################################################################################## + + cursor_factory_db.execute(" SELECT m.id as offline_meter_id, v.name as variable_name " + " FROM tbl_offline_meters m, tbl_variables v " + " WHERE m.id = v.meter_id " + " AND v.meter_type = 'offline_meter' " + " AND v.expression_id = %s ", + (virtual_meter['expression_id'],)) + rows = cursor_factory_db.fetchall() + if rows is not None and len(rows) > 0: + for row in rows: + offline_meter_list_in_expression.append({"offline_meter_id": row[0], + "variable_name": row[1].lower()}) + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 2.2 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + finally: + if cursor_factory_db: + cursor_factory_db.close() + if cnx_factory_db: + cnx_factory_db.close() + + ############################################################################################################ + # Step 3: query energy consumption values from table meter hourly, virtual meter hourly + # and offline meter hourly + ############################################################################################################ + + print("getting energy consumption values from myems_energy_db.tbl_meter_hourly...") + energy_meter_hourly = dict() + if meter_list_in_expression is not None and len(meter_list_in_expression) > 0: + try: + for meter_in_expression in meter_list_in_expression: + meter_id = str(meter_in_expression['meter_id']) + query = (" SELECT start_datetime_utc, actual_value " + " FROM tbl_meter_hourly " + " WHERE meter_id = %s AND start_datetime_utc >= %s AND start_datetime_utc < %s " + " ORDER BY start_datetime_utc ") + cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc, )) + rows_energy_values = cursor_energy_db.fetchall() + if rows_energy_values is None or len(rows_energy_values) == 0: + energy_meter_hourly[meter_id] = None + else: + energy_meter_hourly[meter_id] = dict() + for row_energy_value in rows_energy_values: + energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1] + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 3.2 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + print("getting energy consumption values from myems_energy_db.tbl_virtual_meter_hourly...") + energy_virtual_meter_hourly = dict() + if virtual_meter_list_in_expression is not None and len(virtual_meter_list_in_expression) > 0: + try: + for virtual_meter_in_expression in virtual_meter_list_in_expression: + virtual_meter_id = str(virtual_meter_in_expression['virtual_meter_id']) + query = (" SELECT start_datetime_utc, actual_value " + " FROM tbl_virtual_meter_hourly " + " WHERE virtual_meter_id = %s " + " AND start_datetime_utc >= %s AND start_datetime_utc < %s " + " ORDER BY start_datetime_utc ") + cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,)) + rows_energy_values = cursor_energy_db.fetchall() + if rows_energy_values is None or len(rows_energy_values) == 0: + energy_virtual_meter_hourly[virtual_meter_id] = None + else: + energy_virtual_meter_hourly[virtual_meter_id] = dict() + for row_energy_value in rows_energy_values: + energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1] + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 3.3 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + print("getting energy consumption values from myems_energy_db.tbl_offline_meter_hourly...") + energy_offline_meter_hourly = dict() + if offline_meter_list_in_expression is not None and len(offline_meter_list_in_expression) > 0: + try: + for offline_meter_in_expression in offline_meter_list_in_expression: + offline_meter_id = str(offline_meter_in_expression['offline_meter_id']) + query = (" SELECT start_datetime_utc, actual_value " + " FROM tbl_offline_meter_hourly " + " WHERE offline_meter_id = %s " + " AND start_datetime_utc >= %s AND start_datetime_utc < %s " + " ORDER BY start_datetime_utc ") + cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,)) + rows_energy_values = cursor_energy_db.fetchall() + if rows_energy_values is None or len(rows_energy_values) == 0: + energy_offline_meter_hourly[offline_meter_id] = None + else: + energy_offline_meter_hourly[offline_meter_id] = dict() + for row_energy_value in rows_energy_values: + energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1] + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 3.4 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + ############################################################################################################ + # Step 4: evaluate the equation with variables values from previous step + # and save to table virtual meter hourly + ############################################################################################################ + + print("getting common time slot of energy values for all meters...") + common_start_datetime_utc = start_datetime_utc + common_end_datetime_utc = end_datetime_utc + if energy_meter_hourly is not None and len(energy_meter_hourly) > 0: + for meter_id, energy_hourly in energy_meter_hourly.items(): + if energy_hourly is None or len(energy_hourly) == 0: + common_start_datetime_utc = None + common_end_datetime_utc = None + break + else: + if common_start_datetime_utc < min(energy_hourly.keys()): + common_start_datetime_utc = min(energy_hourly.keys()) + if common_end_datetime_utc > max(energy_hourly.keys()): + common_end_datetime_utc = max(energy_hourly.keys()) + + print("getting common time slot of energy values for all virtual meters...") + if common_start_datetime_utc is not None and common_start_datetime_utc is not None: + if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0: + for meter_id, energy_hourly in energy_virtual_meter_hourly.items(): + if energy_hourly is None or len(energy_hourly) == 0: + common_start_datetime_utc = None + common_end_datetime_utc = None + break + else: + if common_start_datetime_utc < min(energy_hourly.keys()): + common_start_datetime_utc = min(energy_hourly.keys()) + if common_end_datetime_utc > max(energy_hourly.keys()): + common_end_datetime_utc = max(energy_hourly.keys()) + + print("getting common time slot of energy values for all offline meters...") + if common_start_datetime_utc is not None and common_start_datetime_utc is not None: + if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0: + for meter_id, energy_hourly in energy_offline_meter_hourly.items(): + if energy_hourly is None or len(energy_hourly) == 0: + common_start_datetime_utc = None + common_end_datetime_utc = None + break + else: + if common_start_datetime_utc < min(energy_hourly.keys()): + common_start_datetime_utc = min(energy_hourly.keys()) + if common_end_datetime_utc > max(energy_hourly.keys()): + common_end_datetime_utc = max(energy_hourly.keys()) + + print("evaluating the equation with SymPy...") + normalized_values = list() + + ############################################################################################################ + # Converting Strings to SymPy Expressions + # The sympify function(that’s sympify, not to be confused with simplify) can be used to + # convert strings into SymPy expressions. + ############################################################################################################ + try: + expr = sympify(virtual_meter['equation'].lower()) + print("the expression to be evaluated: " + str(expr)) + current_datetime_utc = common_start_datetime_utc + print("common_start_datetime_utc: " + str(common_start_datetime_utc)) + print("common_end_datetime_utc: " + str(common_end_datetime_utc)) + while common_start_datetime_utc is not None \ + and common_end_datetime_utc is not None \ + and current_datetime_utc <= common_end_datetime_utc: + meta_data = dict() + meta_data['start_datetime_utc'] = current_datetime_utc + + #################################################################################################### + # create a dictionary of Symbol: point pairs + #################################################################################################### + + subs = dict() + + #################################################################################################### + # Evaluating the expression at current_datetime_utc + #################################################################################################### + + if meter_list_in_expression is not None and len(meter_list_in_expression) > 0: + for meter_in_expression in meter_list_in_expression: + meter_id = str(meter_in_expression['meter_id']) + actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, 0.0) + subs[meter_in_expression['variable_name']] = actual_value + + if virtual_meter_list_in_expression is not None and len(virtual_meter_list_in_expression) > 0: + for virtual_meter_in_expression in virtual_meter_list_in_expression: + virtual_meter_id = str(virtual_meter_in_expression['virtual_meter_id']) + actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, 0.0) + subs[virtual_meter_in_expression['variable_name']] = actual_value + + if offline_meter_list_in_expression is not None and len(offline_meter_list_in_expression) > 0: + for offline_meter_in_expression in offline_meter_list_in_expression: + offline_meter_id = str(offline_meter_in_expression['offline_meter_id']) + actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, 0.0) + subs[offline_meter_in_expression['variable_name']] = actual_value + + #################################################################################################### + # To numerically evaluate an expression with a Symbol at a point, + # we might use subs followed by evalf, + # but it is more efficient and numerically stable to pass the substitution to evalf + # using the subs flag, which takes a dictionary of Symbol: point pairs. + #################################################################################################### + + meta_data['actual_value'] = expr.evalf(subs=subs) + + normalized_values.append(meta_data) + + current_datetime_utc += timedelta(minutes=config.minutes_to_count) + + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 4.1 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + print("saving energy values to table virtual meter hourly...") + + if len(normalized_values) > 0: + try: + add_values = (" INSERT INTO tbl_virtual_meter_hourly " + " (virtual_meter_id, start_datetime_utc, actual_value) " + " VALUES ") + + for meta_data in normalized_values: + add_values += " (" + str(virtual_meter['id']) + "," + add_values += "'" + meta_data['start_datetime_utc'].isoformat()[0:19] + "'," + add_values += str(meta_data['actual_value']) + "), " + print("add_values:" + add_values) + # trim ", " at the end of string and then execute + cursor_energy_db.execute(add_values[:-2]) + cnx_energy_db.commit() + except Exception as e: + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + return "Error in step 4.2 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'" + + if cursor_energy_db: + cursor_energy_db.close() + if cnx_energy_db: + cnx_energy_db.close() + + return None