Merge branch 'develop'

pull/12/head
13621160019@163.com 2021-02-19 12:31:28 +08:00
commit 65329638df
12 changed files with 1434 additions and 5 deletions

View File

@ -46,7 +46,8 @@ Download and install bacpypes library
Install myems-bacnet service Install myems-bacnet service
``` ```
$ cd ~ $ cd ~
$ git clone https://github.com/myems.git $ git clone https://github.com/myems/myems.git
$ cd myems
$ git checkout master (or the latest release tag) $ git checkout master (or the latest release tag)
$ sudo cp -R ~/myems/myems-bacnet /myems-bacnet $ sudo cp -R ~/myems/myems-bacnet /myems-bacnet
``` ```

View File

@ -33,14 +33,14 @@ Download and install MySQL Connector:
Install myems-cleaning service Install myems-cleaning service
```bash ```bash
$ cd ~ $ cd ~
$ git clone https://github.com/myems.git $ git clone https://github.com/myems/myems.git
$ cd myems
$ sudo git checkout master (or the latest release tag) $ sudo git checkout master (or the latest release tag)
$ sudo cp -R ~/myems/myems-cleaning /myems-cleaning $ sudo cp -R ~/myems/myems-cleaning /myems-cleaning
$ cd /myems-cleaning
``` ```
Open config file and edit database configuration Open config file and edit database configuration
```bash ```bash
$ sudo nano config.py $ sudo nano /myems-cleaning/config.py
``` ```
Setup systemd service: Setup systemd service:
```bash ```bash

View File

@ -43,9 +43,9 @@ Install myems-modbus-tcp service
``` ```
$ cd ~ $ cd ~
$ git clone https://github.com/myems/myems.git $ git clone https://github.com/myems/myems.git
$ cd myems
$ sudo git checkout master (or the latest release tag) $ sudo git checkout master (or the latest release tag)
$ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp $ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp
$ cd /myems-modbus-tcp
``` ```
Eidt the config Eidt the config
``` ```

View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 MyEMS
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

View File

@ -0,0 +1,107 @@
## MyEMS Normalization Service 数据规范化服务
### Introduction
This service is a component of MyEMS and it normalizes energy data in historical database.
[![Codacy Badge](https://api.codacy.com/project/badge/Grade/9c4a70cf88ab410d91294bf32ef6f371)](https://app.codacy.com/gh/myems/myems-normalization?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-normalization&utm_campaign=Badge_Grade)
[![Scrutinizer Code Quality](https://scrutinizer-ci.com/g/myems/myems-normalization/badges/quality-score.png?b=master)](https://scrutinizer-ci.com/g/myems/myems-normalization/?branch=master)
[![Maintainability](https://api.codeclimate.com/v1/badges/4c9f5e65a35a1c968471/maintainability)](https://codeclimate.com/github/myems/myems-normalization/maintainability)
[![Total alerts](https://img.shields.io/lgtm/alerts/g/myems/myems-normalization.svg?logo=lgtm&logoWidth=18)](https://lgtm.com/projects/g/myems/myems-normalization/alerts/)
### Prerequisites
mysql.connector
sympy
openpyxl
### Installation
Download and install MySQL Connector:
```
$ cd ~/tools
$ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz
$ tar xzf mysql-connector-python-8.0.20.tar.gz
$ cd ~/tools/mysql-connector-python-8.0.20
$ sudo python3 setup.py install
```
Download and install mpmath
```
$ cd ~/tools
$ git clone https://github.com/fredrik-johansson/mpmath.git
$ cd ~/tools/mpmath
$ sudo python3 setup.py install
```
Download and install SymPy
```
$ cd ~/tools
$ git clone https://github.com/sympy/sympy.git
$ cd ~/tools/sympy
$ sudo python3 setupegg.py develop
```
Download and install openpyxl
```
$ cd ~/tools
Get the latest version of et_xmlfile from https://bitbucket.org/openpyxl/et_xmlfile/downloads/
$ wget https://bitbucket.org/openpyxl/et_xmlfile/get/50973a6de49c.zip
$ 7z x 50973a6de49c.zip && mv openpyxl-et_xmlfile-50973a6de49c et_xmlfile
$ git clone https://github.com/phn/jdcal.git
Get the latest version of openpyxl from https://bitbucket.org/openpyxl/openpyxl/downloads/
$ wget https://bitbucket.org/openpyxl/openpyxl/get/8953233f5af2.zip
$ 7z x 8953233f5af2.zip && mv openpyxl-openpyxl-8953233f5af2 openpyxl
$ cd ~/tools/et_xmlfile
$ sudo python3 setup.py install
$ cd ~/tools/jdcal
$ sudo python3 setup.py install
$ cd ~/tools/openpyxl
$ sudo python3 setup.py install
```
Install myems-normalization service:
```
$ cd ~
$ git clone https://github.com/myems/myems.git
$ cd myems
$ sudo git checkout master (or the latest release tag)
$ sudo cp -r ~/myems/myems-normalization /myems-normalization
```
Edit config.py for your project
```
$ sudo nano /myems-normalization/config.py
```
Setup systemd service:
```
$ sudo cp myems-normalization.service /lib/systemd/system/
```
Enable the service:
```
$ sudo systemctl enable feed-normalization.service
```
Start the service:
```
$ sudo systemctl start feed-normalization.service
```
### References
1. https://myems.io
2. https://dev.mysql.com/doc/connector-python/en/
3. https://github.com/sympy/sympy
4. https://openpyxl.readthedocs.io

View File

@ -0,0 +1,43 @@
myems_system_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_system_db',
'port': 3306,
}
myems_energy_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_energy_db',
'port': 3306,
}
myems_historical_db = {
'user': 'root',
'password': '!MyEMS1',
'host': '127.0.0.1',
'database': 'myems_historical_db',
'port': 3306,
}
# indicates in how many minutes to normalize energy consumption
# 30 for half hourly
# 60 for hourly
minutes_to_count = 60
# indicates within how many minutes to allow myems-cleaning service to clean the historical data
minutes_to_clean = 30
# indicates from when (in UTC timezone) to calculate if the energy data is empty or were cleared
# format string: '%Y-%m-%d %H:%M:%S'
start_datetime_utc = '2019-12-31 16:00:00'
# the number of worker processes in parallel for meter and virtual meter
# the pool size depends on the computing performance of the database server and the analysis server
pool_size = 5
# indicates the project's time zone offset from UTC
utc_offset = '+08:00'

View File

@ -0,0 +1,35 @@
import logging
from logging.handlers import RotatingFileHandler
from multiprocessing import Process
import meter
import offlinemeter
import virtualmeter
def main():
"""main"""
# create logger
logger = logging.getLogger('myems-normalization')
# specifies the lowest-severity log message a logger will handle,
# where debug is the lowest built-in severity level and critical is the highest built-in severity.
# For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL
# messages and will ignore DEBUG messages.
logger.setLevel(logging.ERROR)
# create file handler which logs messages
fh = RotatingFileHandler('myems-normalization.log', maxBytes=1024*1024, backupCount=1)
# create formatter and add it to the handlers
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
fh.setFormatter(formatter)
# add the handlers to logger
logger.addHandler(fh)
# calculate energy consumption in hourly period
Process(target=meter.calculate_hourly, args=(logger,)).start()
Process(target=virtualmeter.calculate_hourly, args=(logger,)).start()
Process(target=offlinemeter.calculate_hourly, args=(logger,)).start()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,430 @@
import time
from datetime import datetime, timedelta, timezone
import mysql.connector
from multiprocessing import Pool
import random
from decimal import Decimal
import config
########################################################################################################################
# PROCEDURES:
# Step 1: Query all meters and associated energy value points
# Step 2: Create multiprocessing pool to call worker in parallel
########################################################################################################################
def calculate_hourly(logger):
while True:
################################################################################################################
# Step 1: Query all meters and associated energy value points
################################################################################################################
cnx_system_db = None
cursor_system_db = None
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error in step 1.1 of meter.calculate_hourly process " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# sleep several minutes and continue the outer loop to reconnect the database
time.sleep(60)
continue
print("Connected to the MyEMS System Database")
try:
cursor_system_db.execute(" SELECT m.id, m.name, m.hourly_low_limit, m.hourly_high_limit, "
" p.id as point_id, p.units "
" FROM tbl_meters m, tbl_meters_points mp, tbl_points p "
" WHERE m.id = mp.meter_id "
" AND mp.point_id = p.id "
" AND p.object_type = 'ENERGY_VALUE'")
rows_meters = cursor_system_db.fetchall()
if rows_meters is None or len(rows_meters) == 0:
# sleep several minutes and continue the outer loop to reconnect the database
time.sleep(60)
continue
meter_list = list()
for row in rows_meters:
meta_result = {"id": row[0],
"name": row[1],
"hourly_low_limit": row[2],
"hourly_high_limit": row[3],
"point_id": row[4],
"units": row[5]}
meter_list.append(meta_result)
except Exception as e:
logger.error("Error in step 1.2 meter.calculate_hourly " + str(e))
# sleep several minutes and continue the outer loop to reconnect the database
time.sleep(60)
continue
finally:
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# shuffle the meter list for randomly calculating the meter hourly value
random.shuffle(meter_list)
print("Got all meters in MyEMS System Database")
################################################################################################################
# Step 2: Create multiprocessing pool to call worker in parallel
################################################################################################################
p = Pool(processes=config.pool_size)
error_list = p.map(worker, meter_list)
p.close()
p.join()
for error in error_list:
if error is not None and len(error) > 0:
logger.error(error)
print("go to sleep ...")
time.sleep(60)
print("wake from sleep, and continue to work...")
# end of outer while
########################################################################################################################
# PROCEDURES:
# Step 1: Determine the start datetime and end datetime
# Step 2: Get raw data from historical database between start_datetime_utc and end datetime
# Step 3: Normalize energy values by minutes_to_count
# Step 4: Insert into energy database
#
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
########################################################################################################################
def worker(meter):
print("Start to process meter: " + "'" + meter['name'] + "'")
####################################################################################################################
# Step 1: Determine the start datetime and end datetime
####################################################################################################################
cnx_energy_db = None
cursor_energy_db = None
try:
cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
cursor_energy_db = cnx_energy_db.cursor()
except Exception as e:
error_string = "Error in step 1.1 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print(error_string)
return error_string
# get the initial start datetime from config file in case there is no energy data
start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
start_datetime_utc = start_datetime_utc.replace(tzinfo=timezone.utc)
start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0)
try:
query = (" SELECT MAX(start_datetime_utc) "
" FROM tbl_meter_hourly "
" WHERE meter_id = %s ")
cursor_energy_db.execute(query, (meter['id'],))
row_datetime = cursor_energy_db.fetchone()
except Exception as e:
error_string = "Error in step 1.3 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print(error_string)
return error_string
if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
start_datetime_utc = row_datetime[0].replace(tzinfo=timezone.utc)
# replace second and microsecond with 0
# NOTE: DO NOT replace minute in case of calculating in half hourly
start_datetime_utc = start_datetime_utc.replace(second=0, microsecond=0)
# start from the next time slot
start_datetime_utc += timedelta(minutes=config.minutes_to_count)
end_datetime_utc = datetime.utcnow().replace(tzinfo=timezone.utc)
# we should allow myems-cleaning service to take at most [minutes_to_clean] minutes to clean the data
end_datetime_utc -= timedelta(minutes=config.minutes_to_clean)
time_difference = end_datetime_utc - start_datetime_utc
time_difference_in_minutes = time_difference / timedelta(minutes=1)
if time_difference_in_minutes < config.minutes_to_count:
error_string = "it's too early to calculate" + " for '" + meter['name'] + "'"
print(error_string)
return error_string
# trim end_datetime_utc
trimmed_end_datetime_utc = start_datetime_utc + timedelta(minutes=config.minutes_to_count)
while trimmed_end_datetime_utc <= end_datetime_utc:
trimmed_end_datetime_utc += timedelta(minutes=config.minutes_to_count)
end_datetime_utc = trimmed_end_datetime_utc - timedelta(minutes=config.minutes_to_count)
if end_datetime_utc <= start_datetime_utc:
error_string = "it's too early to calculate" + " for '" + meter['name'] + "'"
print(error_string)
return error_string
print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
+ "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
####################################################################################################################
# Step 2: Get raw data from historical database between start_datetime_utc and end_datetime_utc
####################################################################################################################
cnx_historical_db = None
cursor_historical_db = None
try:
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
cursor_historical_db = cnx_historical_db.cursor()
except Exception as e:
error_string = "Error in step 1.2 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print(error_string)
return error_string
# query latest record before start_datetime_utc
energy_value_just_before_start = dict()
try:
query = (" SELECT utc_date_time, actual_value "
" FROM tbl_energy_value "
" WHERE point_id = %s AND utc_date_time < %s AND is_bad = FALSE "
" ORDER BY utc_date_time DESC "
" LIMIT 1 ")
cursor_historical_db.execute(query, (meter['point_id'], start_datetime_utc,))
row_energy_value_before_start = cursor_historical_db.fetchone()
if row_energy_value_before_start is not None and len(row_energy_value_before_start) > 0:
energy_value_just_before_start = {"utc_date_time": row_energy_value_before_start[0],
"actual_value": row_energy_value_before_start[1]}
except Exception as e:
error_string = "Error in step 2.2 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print(error_string)
return error_string
# query energy values to be normalized
try:
query = (" SELECT utc_date_time, actual_value "
" FROM tbl_energy_value "
" WHERE point_id = %s AND utc_date_time >= %s AND utc_date_time < %s AND is_bad = FALSE "
" ORDER BY utc_date_time ")
cursor_historical_db.execute(query, (meter['point_id'], start_datetime_utc, end_datetime_utc))
rows_energy_values = cursor_historical_db.fetchall()
except Exception as e:
error_string = "Error in step 2.3 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print(error_string)
return error_string
finally:
if cursor_historical_db:
cursor_historical_db.close()
if cnx_historical_db:
cnx_historical_db.close()
####################################################################################################################
# Step 3: Normalize energy values by minutes_to_count
####################################################################################################################
####################################################################################################################
# special test case 1 (disconnected)
# id point_id utc_date_time actual_value
# '878152', '3315', '2016-12-05 23:58:46', '38312088'
# '878183', '3315', '2016-12-05 23:59:48', '38312088'
# '878205', '3315', '2016-12-06 06:14:49', '38315900'
# '878281', '3315', '2016-12-06 06:15:50', '38315928'
# '878357', '3315', '2016-12-06 06:16:52', '38315928'
####################################################################################################################
####################################################################################################################
# special test case 2 (a new added used meter)
# id, point_id, utc_date_time, actual_value
# '19070111', '1734', '2017-03-27 02:36:07', '56842220.77297248'
# '19069943', '1734', '2017-03-27 02:35:04', '56842208.420127675'
# '19069775', '1734', '2017-03-27 02:34:01', '56842195.95270827'
# '19069608', '1734', '2017-03-27 02:32:58', '56842183.48610827'
# '19069439', '1734', '2017-03-27 02:31:53', '56842170.812365524'
# '19069270', '1734', '2017-03-27 02:30:48', '56842157.90797222'
# null, null, null, , null
####################################################################################################################
####################################################################################################################
# special test case 3 (hi_limit exceeded)
# id point_id utc_date_time actual_value
# '3230282', '3336', '2016-12-24 08:26:14', '999984.0625'
# '3230401', '3336', '2016-12-24 08:27:15', '999984.0625'
# '3230519', '3336', '2016-12-24 08:28:17', '999984.0625'
# '3230638', '3336', '2016-12-24 08:29:18', '20'
# '3230758', '3336', '2016-12-24 08:30:20', '20'
# '3230878', '3336', '2016-12-24 08:31:21', '20'
####################################################################################################################
####################################################################################################################
# test case 4 (recovered from bad zeroes)
# id point_id utc_date_time actual_value is_bad
# 300366736 1003344 2019-03-14 02:03:20 1103860.625
# 300366195 1003344 2019-03-14 02:02:19 1103845
# 300365654 1003344 2019-03-14 02:01:19 1103825.5
# 300365106 1003344 2019-03-14 02:00:18 1103804.25
# 300364562 1003344 2019-03-14 01:59:17 1103785.625
# 300364021 1003344 2019-03-14 01:58:17 1103770.875
# 300363478 1003344 2019-03-14 01:57:16 1103755.125
# 300362936 1003344 2019-03-14 01:56:16 1103739.375
# 300362393 1003344 2019-03-14 01:55:15 1103720.625
# 300361851 1003344 2019-03-14 01:54:15 1103698.125
# 300361305 1003344 2019-03-14 01:53:14 1103674.75
# 300360764 1003344 2019-03-14 01:52:14 1103649
# 300360221 1003344 2019-03-14 01:51:13 1103628.25
# 300359676 1003344 2019-03-14 01:50:13 1103608.625
# 300359133 1003344 2019-03-14 01:49:12 1103586.75
# 300358592 1003344 2019-03-14 01:48:12 1103564
# 300358050 1003344 2019-03-14 01:47:11 1103542
# 300357509 1003344 2019-03-14 01:46:11 1103520.625
# 300356966 1003344 2019-03-14 01:45:10 1103499.375
# 300356509 1003344 2019-03-14 01:44:10 1103478.25
# 300355964 1003344 2019-03-14 01:43:09 1103456.25
# 300355419 1003344 2019-03-14 01:42:09 1103435.5
# 300354878 1003344 2019-03-14 01:41:08 1103414.625
# 300354335 1003344 2019-03-14 01:40:08 1103391.875
# 300353793 1003344 2019-03-14 01:39:07 1103373
# 300353248 1003344 2019-03-14 01:38:07 1103349
# 300352705 1003344 2019-03-14 01:37:06 1103325.75
# 300352163 1003344 2019-03-14 01:36:06 0 1
# 300351621 1003344 2019-03-14 01:35:05 0 1
# 300351080 1003344 2019-03-14 01:34:05 0 1
# 300350532 1003344 2019-03-14 01:33:04 0 1
# 300349988 1003344 2019-03-14 01:32:04 0 1
# 300349446 1003344 2019-03-14 01:31:03 0 1
# 300348903 1003344 2019-03-14 01:30:02 0 1
# 300348359 1003344 2019-03-14 01:29:02 0 1
# 300347819 1003344 2019-03-14 01:28:01 0 1
# 300347277 1003344 2019-03-14 01:27:01 0 1
# 300346733 1003344 2019-03-14 01:26:00 0 1
# 300346191 1003344 2019-03-14 01:25:00 0 1
####################################################################################################################
normalized_values = list()
if rows_energy_values is None or len(rows_energy_values) == 0:
# NOTE: there isn't any value to be normalized
# that means the meter is offline or all values are bad
current_datetime_utc = start_datetime_utc
while current_datetime_utc < end_datetime_utc:
normalized_values.append({'start_datetime_utc': current_datetime_utc, 'actual_value': Decimal(0.0)})
current_datetime_utc += timedelta(minutes=config.minutes_to_count)
else:
maximum = Decimal(0.0)
if energy_value_just_before_start is not None and \
len(energy_value_just_before_start) > 0 and \
energy_value_just_before_start['actual_value'] > Decimal(0.0):
maximum = energy_value_just_before_start['actual_value']
current_datetime_utc = start_datetime_utc
while current_datetime_utc < end_datetime_utc:
initial_maximum = maximum
# get all energy values in current time slot
current_energy_values = list()
while len(rows_energy_values) > 0:
row_energy_value = rows_energy_values.pop(0)
energy_value_datetime = row_energy_value[0].replace(tzinfo=timezone.utc)
if energy_value_datetime < current_datetime_utc + timedelta(minutes=config.minutes_to_count):
current_energy_values.append(row_energy_value)
else:
rows_energy_values.insert(0, row_energy_value)
break
# get the energy increment one by one in current time slot
increment = Decimal(0.0)
# maximum should be equal to the maximum value of last time here
for index in range(len(current_energy_values)):
current_energy_value = current_energy_values[index]
if maximum < current_energy_value[1]:
increment += current_energy_value[1] - maximum
maximum = current_energy_value[1]
# omit huge initial value for a new meter
# or omit huge value for a recovered meter with zero values during failure
# NOTE: this method may cause the lose of energy consumption in this time slot
if initial_maximum <= Decimal(0.1):
increment = Decimal(0.0)
# check with hourly low limit
if increment < meter['hourly_low_limit']:
increment = Decimal(0.0)
# check with hourly high limit
# NOTE: this method may cause the lose of energy consumption in this time slot
if increment > meter['hourly_high_limit']:
increment = Decimal(0.0)
meta_data = {'start_datetime_utc': current_datetime_utc,
'actual_value': increment}
# append mete_data
normalized_values.append(meta_data)
current_datetime_utc += timedelta(minutes=config.minutes_to_count)
####################################################################################################################
# Step 4: Insert into energy database
####################################################################################################################
if len(normalized_values) > 0:
try:
add_values = (" INSERT INTO tbl_meter_hourly (meter_id, start_datetime_utc, actual_value) "
" VALUES ")
for meta_data in normalized_values:
add_values += " (" + str(meter['id']) + ","
add_values += "'" + meta_data['start_datetime_utc'].isoformat()[0:19] + "',"
add_values += str(meta_data['actual_value']) + "), "
# trim ", " at the end of string and then execute
cursor_energy_db.execute(add_values[:-2])
cnx_energy_db.commit()
except Exception as e:
error_string = "Error in step 4.1 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print(error_string)
return error_string
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
print("End of processing meter: " + "'" + meter['name'] + "'")
return None

View File

@ -0,0 +1,15 @@
[Unit]
Description=myems-normalization daemon
After=network.target
[Service]
User=root
Group=root
ExecStart=/usr/bin/python3 /myems-normalization/main.py
ExecReload=/bin/kill -s HUP $MAINPID
ExecStop=/bin/kill -s TERM $MAINPID
PrivateTmp=true
Restart=always
[Install]
WantedBy=multi-user.target

Binary file not shown.

View File

@ -0,0 +1,302 @@
import time
from datetime import datetime, timedelta
import mysql.connector
from openpyxl import load_workbook
import config
################################################################################################################
# PROCEDURES:
# STEP 1: get all 'new' offline meter files
# STEP 2: for each new files, iterate all rows and read cell's value and store data to energy data list
# STEP 3: insert or update energy data to table offline meter hourly in energy database
# STEP 4: update file status to 'done' or 'error'
################################################################################################################
def calculate_hourly(logger):
while True:
# outer loop to reconnect server if there is a connection error
################################################################################################################
# STEP 1: get all 'new' offline meter files
################################################################################################################
cnx = None
cursor = None
try:
cnx = mysql.connector.connect(**config.myems_historical_db)
cursor = cnx.cursor()
except Exception as e:
logger.error("Error in step 1.1 of offline meter.calculate_hourly " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
# sleep several minutes and continue the outer loop to reconnect the database
print("Could not connect the MyEMS Historical Database, and go to sleep 60 seconds...")
time.sleep(60)
continue
print("Connected to MyEMS Historical Database")
print("Getting all new offline meter files")
try:
query = (" SELECT id, file_name, file_object "
" FROM tbl_offline_meter_files "
" WHERE status = 'new' "
" ORDER BY id ")
cursor.execute(query, )
rows_files = cursor.fetchall()
except Exception as e:
logger.error("Error in step 1.2 of offline meter.calculate_hourly " + str(e))
time.sleep(60)
continue
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
excel_file_list = list()
if rows_files is not None and len(rows_files) > 0:
for row_file in rows_files:
excel_file_list.append({"id": row_file[0],
"name": row_file[1],
"file_object": row_file[2]})
else:
print("there isn't any new files found, and go to sleep 60 seconds...")
time.sleep(60)
continue
################################################################################################################
# STEP 2: for each new files, dump file object to local file and then load workbook from the local file
################################################################################################################
for excel_file in excel_file_list:
print("read data from each offline meter file" + excel_file['name'])
is_valid_file = True
fw = None
try:
fw = open("myems-normalization.blob", 'wb')
fw.write(excel_file['file_object'])
fw.close()
except Exception as e:
logger.error("Error in step 2.1 of offline meter.calculate_hourly " + str(e))
if fw:
fw.close()
# mark as invalid file
is_valid_file = False
fr = None
wb = None
try:
fr = open("myems-normalization.blob", 'rb')
wb = load_workbook(fr, data_only=True)
fr.close()
except Exception as e:
logger.error("Error in step 2.2 of offline meter.calculate_hourly " + str(e))
if fr:
fr.close()
# mark as invalid file
is_valid_file = False
energy_data_list = list()
# grab the active worksheet
if is_valid_file:
ws = wb.active
# get timezone offset in minutes, this value will be returned to client
timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6])
if config.utc_offset[0] == '-':
timezone_offset = -timezone_offset
for row in ws.iter_rows(min_row=3, max_row=1024, min_col=1, max_col=34):
offline_meter_data = dict()
offline_meter_data['offline_meter_id'] = None
offline_meter_data['offline_meter_name'] = None
offline_meter_data['data'] = dict()
col_num = 0
for cell in row:
col_num += 1
print(cell.value)
if col_num == 1:
# get offline meter ID
if cell.value is not None:
offline_meter_data['offline_meter_id'] = cell.value
else:
break
elif col_num == 2:
# get offline meter name
if cell.value is None:
break
else:
offline_meter_data['offline_meter_name'] = cell.value
elif col_num > 3:
# get date of the cell
try:
offline_datetime = datetime(year=ws['A2'].value,
month=ws['B2'].value,
day=col_num - 3)
except ValueError:
# invalid date and go to next cell in this row until reach max_col
continue
offline_datetime_utc = offline_datetime - timedelta(minutes=timezone_offset)
if cell.value is None:
# if the cell is empty then stop at that day
break
else:
offline_meter_data['data'][offline_datetime_utc] = cell.value
if len(offline_meter_data['data']) > 0:
print("offline_meter_data:" + str(offline_meter_data))
energy_data_list.append(offline_meter_data)
############################################################################################################
# STEP 3: insert or update energy data to table offline meter hourly in energy database
############################################################################################################
print("to valid offline meter id in excel file...")
if len(energy_data_list) == 0:
print("Could not find any offline meters in the file...")
print("and go to process the next file...")
is_valid_file = False
else:
try:
cnx = mysql.connector.connect(**config.myems_system_db)
cursor = cnx.cursor()
except Exception as e:
logger.error("Error in step 3.1 of offlinemeter.calculate_hourly " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
time.sleep(60)
continue
try:
cursor.execute(" SELECT id, name "
" FROM tbl_offline_meters ")
rows_offline_meters = cursor.fetchall()
except Exception as e:
logger.error("Error in step 3.2 of offlinemeter.calculate_hourly " + str(e))
time.sleep(60)
continue
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
if rows_offline_meters is None or len(rows_offline_meters) == 0:
print("Could not find any offline meters in the MyEMS System Database...")
time.sleep(60)
continue
else:
offline_meter_id_set = set()
for row_offline_meter in rows_offline_meters:
# valid offline meter id in excel file
offline_meter_id_set.add(row_offline_meter[0])
for energy_data_item in energy_data_list:
if energy_data_item['offline_meter_id'] not in offline_meter_id_set:
is_valid_file = False
break
if is_valid_file:
####################################################################################################
# delete possibly exists offline meter hourly data in myems energy database,
# and then insert new offline meter hourly data
####################################################################################################
try:
cnx = mysql.connector.connect(**config.myems_energy_db)
cursor = cnx.cursor()
except Exception as e:
logger.error("Error in step 3.2 of offlinemeter.calculate_hourly " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
time.sleep(60)
continue
try:
for energy_data_item in energy_data_list:
offline_meter_id = energy_data_item['offline_meter_id']
print(energy_data_item['data'].items())
for k, v in energy_data_item['data'].items():
start_datetime_utc = k
end_datetime_utc = start_datetime_utc + timedelta(hours=24)
actual_value = v / (24 * 60 / config.minutes_to_count)
cursor.execute(" DELETE FROM tbl_offline_meter_hourly "
" WHERE offline_meter_id = %s "
" AND start_datetime_utc >= %s "
" AND start_datetime_utc < %s ",
(offline_meter_id,
start_datetime_utc.isoformat()[0:19],
end_datetime_utc.isoformat()[0:19]))
cnx.commit()
# todo: check with hourly low limit and hourly high limit
add_values = (" INSERT INTO tbl_offline_meter_hourly "
" (offline_meter_id, start_datetime_utc, actual_value) "
" VALUES ")
while start_datetime_utc < end_datetime_utc:
add_values += " (" + str(offline_meter_id) + ","
add_values += "'" + start_datetime_utc.isoformat()[0:19] + "',"
add_values += str(actual_value) + "), "
start_datetime_utc += timedelta(minutes=config.minutes_to_count)
print("add_values:" + add_values)
# trim ", " at the end of string and then execute
cursor.execute(add_values[:-2])
cnx.commit()
except Exception as e:
logger.error("Error in step 3.3 of offlinemeter.calculate_hourly " + str(e))
time.sleep(60)
continue
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
############################################################################################################
# STEP 4: update file status to 'done' or 'error'
############################################################################################################
print("to update offline meter file status to done...")
try:
cnx = mysql.connector.connect(**config.myems_historical_db)
cursor = cnx.cursor()
except Exception as e:
logger.error("Error in step 4.1 of offlinemeter.calculate_hourly " + str(e))
if cursor:
cursor.close()
if cnx:
cnx.close()
time.sleep(60)
continue
try:
update_row = (" UPDATE tbl_offline_meter_files "
" SET status = %s "
" WHERE id = %s ")
cursor.execute(update_row, ('done' if is_valid_file else 'error', excel_file['id'],))
cnx.commit()
except Exception as e:
logger.error("Error in step 4.2 of offlinemeter.calculate_hourly " + str(e))
time.sleep(60)
continue
finally:
if cursor:
cursor.close()
if cnx:
cnx.close()
# end of for excel_file in excel_file_list
print("go to sleep ...")
time.sleep(300)
print("wake from sleep, and go to work...")
# end of outer while

View File

@ -0,0 +1,475 @@
import time
from datetime import datetime, timedelta
import mysql.connector
from sympy import sympify
from multiprocessing import Pool
import random
import config
########################################################################################################################
# PROCEDURES:
# Step 1: Query all virtual meters
# Step 2: Create multiprocessing pool to call worker in parallel
########################################################################################################################
def calculate_hourly(logger):
while True:
# outer loop to reconnect server if there is a connection error
cnx_system_db = None
cursor_system_db = None
try:
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
cursor_system_db = cnx_system_db.cursor()
except Exception as e:
logger.error("Error in step 0 of virtual_meter.calculate_hourly " + str(e))
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# sleep and continue the outer loop to reconnect the database
time.sleep(60)
continue
print("Connected to MyEMS System Database")
virtual_meter_list = list()
try:
cursor_system_db.execute(" SELECT m.id, m.name, e.equation, e.id as expression_id "
" FROM tbl_virtual_meters m, tbl_expressions e "
" WHERE m.id = e.virtual_meter_id "
" ORDER BY m.id ")
rows_virtual_meters = cursor_system_db.fetchall()
if rows_virtual_meters is None or len(rows_virtual_meters) == 0:
# sleep several minutes and continue the outer loop to reconnect the database
time.sleep(60)
continue
for row in rows_virtual_meters:
meta_result = {"id": row[0], "name": row[1], "equation": row[2], "expression_id": row[3]}
virtual_meter_list.append(meta_result)
except Exception as e:
logger.error("Error in step 1 of virtual meter calculate hourly " + str(e))
# sleep and continue the outer loop to reconnect the database
time.sleep(60)
continue
finally:
if cursor_system_db:
cursor_system_db.close()
if cnx_system_db:
cnx_system_db.close()
# shuffle the virtual meter list for randomly calculating the meter hourly value
random.shuffle(virtual_meter_list)
print("Got all virtual meters in MyEMS System Database")
################################################################################################################
# Step 2: Create multiprocessing pool to call worker in parallel
################################################################################################################
p = Pool(processes=config.pool_size)
error_list = p.map(worker, virtual_meter_list)
p.close()
p.join()
for error in error_list:
if error is not None and len(error) > 0:
logger.error(error)
print("go to sleep ...")
time.sleep(60)
print("wake from sleep, and continue to work...")
########################################################################################################################
# Step 1: get start datetime and end datetime
# Step 2: parse the expression and get all meters, virtual meters, offline meters associated with the expression
# Step 3: query energy consumption values from table meter hourly, virtual meter hourly and offline meter hourly
# Step 4: evaluate the equation with variables values from previous step and save to table virtual meter hourly
# returns the error string for logging or returns None
########################################################################################################################
def worker(virtual_meter):
cnx_energy_db = None
cursor_energy_db = None
try:
cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
cursor_energy_db = cnx_energy_db.cursor()
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 1.1 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
print("Start to process virtual meter: " + "'" + virtual_meter['name']+"'")
####################################################################################################################
# step 1: get start datetime and end datetime
# get latest timestamp from energy database in tbl_virtual_meter_hourly
####################################################################################################################
try:
query = (" SELECT MAX(start_datetime_utc) "
" FROM tbl_virtual_meter_hourly "
" WHERE virtual_meter_id = %s ")
cursor_energy_db.execute(query, (virtual_meter['id'],))
row_datetime = cursor_energy_db.fetchone()
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 1.2 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
# replace second and microsecond with 0
# note: do not replace minute in case of calculating in half hourly
start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
# start from the next time slot
start_datetime_utc += timedelta(minutes=config.minutes_to_count)
end_datetime_utc = datetime.utcnow().replace()
end_datetime_utc = end_datetime_utc.replace(second=0, microsecond=0, tzinfo=None)
time_difference = end_datetime_utc - start_datetime_utc
time_difference_in_minutes = time_difference / timedelta(minutes=1)
if time_difference_in_minutes < config.minutes_to_count:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "it's too early to calculate" + " for '" + virtual_meter['name'] + "'"
# trim end_datetime_utc
trimmed_end_datetime_utc = start_datetime_utc + timedelta(minutes=config.minutes_to_count)
while trimmed_end_datetime_utc <= end_datetime_utc:
trimmed_end_datetime_utc += timedelta(minutes=config.minutes_to_count)
end_datetime_utc = trimmed_end_datetime_utc - timedelta(minutes=config.minutes_to_count)
if end_datetime_utc <= start_datetime_utc:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "it's too early to calculate" + " for '" + virtual_meter['name'] + "'"
print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
+ "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
############################################################################################################
# Step 2: parse the expression and get all meters, virtual meters, and
# offline meters associated with the expression
############################################################################################################
cnx_factory_db = None
cursor_factory_db = None
try:
cnx_factory_db = mysql.connector.connect(**config.myems_system_db)
cursor_factory_db = cnx_factory_db.cursor()
except Exception as e:
if cursor_factory_db:
cursor_factory_db.close()
if cnx_factory_db:
cnx_factory_db.close()
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 2.1 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
meter_list_in_expression = list()
virtual_meter_list_in_expression = list()
offline_meter_list_in_expression = list()
try:
########################################################################################################
# get all meters associated with the expression
########################################################################################################
cursor_factory_db.execute(" SELECT m.id as meter_id, v.name as variable_name "
" FROM tbl_meters m, tbl_variables v "
" WHERE m.id = v.meter_id "
" AND v.meter_type = 'meter' "
" AND v.expression_id = %s ",
(virtual_meter['expression_id'], ))
rows = cursor_factory_db.fetchall()
if rows is not None and len(rows) > 0:
for row in rows:
meter_list_in_expression.append({"meter_id": row[0], "variable_name": row[1].lower()})
########################################################################################################
# get all virtual meters associated with the expression
########################################################################################################
cursor_factory_db.execute(" SELECT m.id as virtual_meter_id, v.name as variable_name "
" FROM tbl_virtual_meters m, tbl_variables v "
" WHERE m.id = v.meter_id "
" AND v.meter_type = 'virtual_meter' "
" AND v.expression_id = %s ",
(virtual_meter['expression_id'],))
rows = cursor_factory_db.fetchall()
if rows is not None and len(rows) > 0:
for row in rows:
virtual_meter_list_in_expression.append({"virtual_meter_id": row[0],
"variable_name": row[1].lower()})
########################################################################################################
# get all offline meters associated with the expression
########################################################################################################
cursor_factory_db.execute(" SELECT m.id as offline_meter_id, v.name as variable_name "
" FROM tbl_offline_meters m, tbl_variables v "
" WHERE m.id = v.meter_id "
" AND v.meter_type = 'offline_meter' "
" AND v.expression_id = %s ",
(virtual_meter['expression_id'],))
rows = cursor_factory_db.fetchall()
if rows is not None and len(rows) > 0:
for row in rows:
offline_meter_list_in_expression.append({"offline_meter_id": row[0],
"variable_name": row[1].lower()})
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 2.2 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
finally:
if cursor_factory_db:
cursor_factory_db.close()
if cnx_factory_db:
cnx_factory_db.close()
############################################################################################################
# Step 3: query energy consumption values from table meter hourly, virtual meter hourly
# and offline meter hourly
############################################################################################################
print("getting energy consumption values from myems_energy_db.tbl_meter_hourly...")
energy_meter_hourly = dict()
if meter_list_in_expression is not None and len(meter_list_in_expression) > 0:
try:
for meter_in_expression in meter_list_in_expression:
meter_id = str(meter_in_expression['meter_id'])
query = (" SELECT start_datetime_utc, actual_value "
" FROM tbl_meter_hourly "
" WHERE meter_id = %s AND start_datetime_utc >= %s AND start_datetime_utc < %s "
" ORDER BY start_datetime_utc ")
cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc, ))
rows_energy_values = cursor_energy_db.fetchall()
if rows_energy_values is None or len(rows_energy_values) == 0:
energy_meter_hourly[meter_id] = None
else:
energy_meter_hourly[meter_id] = dict()
for row_energy_value in rows_energy_values:
energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 3.2 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
print("getting energy consumption values from myems_energy_db.tbl_virtual_meter_hourly...")
energy_virtual_meter_hourly = dict()
if virtual_meter_list_in_expression is not None and len(virtual_meter_list_in_expression) > 0:
try:
for virtual_meter_in_expression in virtual_meter_list_in_expression:
virtual_meter_id = str(virtual_meter_in_expression['virtual_meter_id'])
query = (" SELECT start_datetime_utc, actual_value "
" FROM tbl_virtual_meter_hourly "
" WHERE virtual_meter_id = %s "
" AND start_datetime_utc >= %s AND start_datetime_utc < %s "
" ORDER BY start_datetime_utc ")
cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
rows_energy_values = cursor_energy_db.fetchall()
if rows_energy_values is None or len(rows_energy_values) == 0:
energy_virtual_meter_hourly[virtual_meter_id] = None
else:
energy_virtual_meter_hourly[virtual_meter_id] = dict()
for row_energy_value in rows_energy_values:
energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 3.3 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
print("getting energy consumption values from myems_energy_db.tbl_offline_meter_hourly...")
energy_offline_meter_hourly = dict()
if offline_meter_list_in_expression is not None and len(offline_meter_list_in_expression) > 0:
try:
for offline_meter_in_expression in offline_meter_list_in_expression:
offline_meter_id = str(offline_meter_in_expression['offline_meter_id'])
query = (" SELECT start_datetime_utc, actual_value "
" FROM tbl_offline_meter_hourly "
" WHERE offline_meter_id = %s "
" AND start_datetime_utc >= %s AND start_datetime_utc < %s "
" ORDER BY start_datetime_utc ")
cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
rows_energy_values = cursor_energy_db.fetchall()
if rows_energy_values is None or len(rows_energy_values) == 0:
energy_offline_meter_hourly[offline_meter_id] = None
else:
energy_offline_meter_hourly[offline_meter_id] = dict()
for row_energy_value in rows_energy_values:
energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 3.4 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
############################################################################################################
# Step 4: evaluate the equation with variables values from previous step
# and save to table virtual meter hourly
############################################################################################################
print("getting common time slot of energy values for all meters...")
common_start_datetime_utc = start_datetime_utc
common_end_datetime_utc = end_datetime_utc
if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
for meter_id, energy_hourly in energy_meter_hourly.items():
if energy_hourly is None or len(energy_hourly) == 0:
common_start_datetime_utc = None
common_end_datetime_utc = None
break
else:
if common_start_datetime_utc < min(energy_hourly.keys()):
common_start_datetime_utc = min(energy_hourly.keys())
if common_end_datetime_utc > max(energy_hourly.keys()):
common_end_datetime_utc = max(energy_hourly.keys())
print("getting common time slot of energy values for all virtual meters...")
if common_start_datetime_utc is not None and common_start_datetime_utc is not None:
if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
if energy_hourly is None or len(energy_hourly) == 0:
common_start_datetime_utc = None
common_end_datetime_utc = None
break
else:
if common_start_datetime_utc < min(energy_hourly.keys()):
common_start_datetime_utc = min(energy_hourly.keys())
if common_end_datetime_utc > max(energy_hourly.keys()):
common_end_datetime_utc = max(energy_hourly.keys())
print("getting common time slot of energy values for all offline meters...")
if common_start_datetime_utc is not None and common_start_datetime_utc is not None:
if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
for meter_id, energy_hourly in energy_offline_meter_hourly.items():
if energy_hourly is None or len(energy_hourly) == 0:
common_start_datetime_utc = None
common_end_datetime_utc = None
break
else:
if common_start_datetime_utc < min(energy_hourly.keys()):
common_start_datetime_utc = min(energy_hourly.keys())
if common_end_datetime_utc > max(energy_hourly.keys()):
common_end_datetime_utc = max(energy_hourly.keys())
print("evaluating the equation with SymPy...")
normalized_values = list()
############################################################################################################
# Converting Strings to SymPy Expressions
# The sympify function(thats sympify, not to be confused with simplify) can be used to
# convert strings into SymPy expressions.
############################################################################################################
try:
expr = sympify(virtual_meter['equation'].lower())
print("the expression to be evaluated: " + str(expr))
current_datetime_utc = common_start_datetime_utc
print("common_start_datetime_utc: " + str(common_start_datetime_utc))
print("common_end_datetime_utc: " + str(common_end_datetime_utc))
while common_start_datetime_utc is not None \
and common_end_datetime_utc is not None \
and current_datetime_utc <= common_end_datetime_utc:
meta_data = dict()
meta_data['start_datetime_utc'] = current_datetime_utc
####################################################################################################
# create a dictionary of Symbol: point pairs
####################################################################################################
subs = dict()
####################################################################################################
# Evaluating the expression at current_datetime_utc
####################################################################################################
if meter_list_in_expression is not None and len(meter_list_in_expression) > 0:
for meter_in_expression in meter_list_in_expression:
meter_id = str(meter_in_expression['meter_id'])
actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, 0.0)
subs[meter_in_expression['variable_name']] = actual_value
if virtual_meter_list_in_expression is not None and len(virtual_meter_list_in_expression) > 0:
for virtual_meter_in_expression in virtual_meter_list_in_expression:
virtual_meter_id = str(virtual_meter_in_expression['virtual_meter_id'])
actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, 0.0)
subs[virtual_meter_in_expression['variable_name']] = actual_value
if offline_meter_list_in_expression is not None and len(offline_meter_list_in_expression) > 0:
for offline_meter_in_expression in offline_meter_list_in_expression:
offline_meter_id = str(offline_meter_in_expression['offline_meter_id'])
actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, 0.0)
subs[offline_meter_in_expression['variable_name']] = actual_value
####################################################################################################
# To numerically evaluate an expression with a Symbol at a point,
# we might use subs followed by evalf,
# but it is more efficient and numerically stable to pass the substitution to evalf
# using the subs flag, which takes a dictionary of Symbol: point pairs.
####################################################################################################
meta_data['actual_value'] = expr.evalf(subs=subs)
normalized_values.append(meta_data)
current_datetime_utc += timedelta(minutes=config.minutes_to_count)
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 4.1 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
print("saving energy values to table virtual meter hourly...")
if len(normalized_values) > 0:
try:
add_values = (" INSERT INTO tbl_virtual_meter_hourly "
" (virtual_meter_id, start_datetime_utc, actual_value) "
" VALUES ")
for meta_data in normalized_values:
add_values += " (" + str(virtual_meter['id']) + ","
add_values += "'" + meta_data['start_datetime_utc'].isoformat()[0:19] + "',"
add_values += str(meta_data['actual_value']) + "), "
print("add_values:" + add_values)
# trim ", " at the end of string and then execute
cursor_energy_db.execute(add_values[:-2])
cnx_energy_db.commit()
except Exception as e:
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return "Error in step 4.2 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
if cursor_energy_db:
cursor_energy_db.close()
if cnx_energy_db:
cnx_energy_db.close()
return None