merged myems-normalization
parent
2265b440ce
commit
70467a3755
|
@ -46,7 +46,8 @@ Download and install bacpypes library
|
||||||
Install myems-bacnet service
|
Install myems-bacnet service
|
||||||
```
|
```
|
||||||
$ cd ~
|
$ cd ~
|
||||||
$ git clone https://github.com/myems.git
|
$ git clone https://github.com/myems/myems.git
|
||||||
|
$ cd myems
|
||||||
$ git checkout master (or the latest release tag)
|
$ git checkout master (or the latest release tag)
|
||||||
$ sudo cp -R ~/myems/myems-bacnet /myems-bacnet
|
$ sudo cp -R ~/myems/myems-bacnet /myems-bacnet
|
||||||
```
|
```
|
||||||
|
|
|
@ -33,14 +33,14 @@ Download and install MySQL Connector:
|
||||||
Install myems-cleaning service
|
Install myems-cleaning service
|
||||||
```bash
|
```bash
|
||||||
$ cd ~
|
$ cd ~
|
||||||
$ git clone https://github.com/myems.git
|
$ git clone https://github.com/myems/myems.git
|
||||||
|
$ cd myems
|
||||||
$ sudo git checkout master (or the latest release tag)
|
$ sudo git checkout master (or the latest release tag)
|
||||||
$ sudo cp -R ~/myems/myems-cleaning /myems-cleaning
|
$ sudo cp -R ~/myems/myems-cleaning /myems-cleaning
|
||||||
$ cd /myems-cleaning
|
|
||||||
```
|
```
|
||||||
Open config file and edit database configuration
|
Open config file and edit database configuration
|
||||||
```bash
|
```bash
|
||||||
$ sudo nano config.py
|
$ sudo nano /myems-cleaning/config.py
|
||||||
```
|
```
|
||||||
Setup systemd service:
|
Setup systemd service:
|
||||||
```bash
|
```bash
|
||||||
|
|
|
@ -43,9 +43,9 @@ Install myems-modbus-tcp service
|
||||||
```
|
```
|
||||||
$ cd ~
|
$ cd ~
|
||||||
$ git clone https://github.com/myems/myems.git
|
$ git clone https://github.com/myems/myems.git
|
||||||
|
$ cd myems
|
||||||
$ sudo git checkout master (or the latest release tag)
|
$ sudo git checkout master (or the latest release tag)
|
||||||
$ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp
|
$ sudo cp -R ~/myems/myems-modbus-tcp /myems-modbus-tcp
|
||||||
$ cd /myems-modbus-tcp
|
|
||||||
```
|
```
|
||||||
Eidt the config
|
Eidt the config
|
||||||
```
|
```
|
||||||
|
|
|
@ -0,0 +1,21 @@
|
||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2021 MyEMS
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
|
@ -0,0 +1,107 @@
|
||||||
|
## MyEMS Normalization Service 数据规范化服务
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
### Introduction
|
||||||
|
|
||||||
|
This service is a component of MyEMS and it normalizes energy data in historical database.
|
||||||
|
|
||||||
|
[](https://app.codacy.com/gh/myems/myems-normalization?utm_source=github.com&utm_medium=referral&utm_content=myems/myems-normalization&utm_campaign=Badge_Grade)
|
||||||
|
[](https://scrutinizer-ci.com/g/myems/myems-normalization/?branch=master)
|
||||||
|
[](https://codeclimate.com/github/myems/myems-normalization/maintainability)
|
||||||
|
[](https://lgtm.com/projects/g/myems/myems-normalization/alerts/)
|
||||||
|
|
||||||
|
|
||||||
|
### Prerequisites
|
||||||
|
|
||||||
|
mysql.connector
|
||||||
|
|
||||||
|
sympy
|
||||||
|
|
||||||
|
openpyxl
|
||||||
|
|
||||||
|
|
||||||
|
### Installation
|
||||||
|
|
||||||
|
Download and install MySQL Connector:
|
||||||
|
```
|
||||||
|
$ cd ~/tools
|
||||||
|
$ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz
|
||||||
|
$ tar xzf mysql-connector-python-8.0.20.tar.gz
|
||||||
|
$ cd ~/tools/mysql-connector-python-8.0.20
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
```
|
||||||
|
|
||||||
|
Download and install mpmath
|
||||||
|
```
|
||||||
|
$ cd ~/tools
|
||||||
|
$ git clone https://github.com/fredrik-johansson/mpmath.git
|
||||||
|
$ cd ~/tools/mpmath
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
```
|
||||||
|
|
||||||
|
Download and install SymPy
|
||||||
|
```
|
||||||
|
$ cd ~/tools
|
||||||
|
$ git clone https://github.com/sympy/sympy.git
|
||||||
|
$ cd ~/tools/sympy
|
||||||
|
$ sudo python3 setupegg.py develop
|
||||||
|
```
|
||||||
|
|
||||||
|
Download and install openpyxl
|
||||||
|
```
|
||||||
|
$ cd ~/tools
|
||||||
|
|
||||||
|
Get the latest version of et_xmlfile from https://bitbucket.org/openpyxl/et_xmlfile/downloads/
|
||||||
|
$ wget https://bitbucket.org/openpyxl/et_xmlfile/get/50973a6de49c.zip
|
||||||
|
$ 7z x 50973a6de49c.zip && mv openpyxl-et_xmlfile-50973a6de49c et_xmlfile
|
||||||
|
|
||||||
|
$ git clone https://github.com/phn/jdcal.git
|
||||||
|
|
||||||
|
Get the latest version of openpyxl from https://bitbucket.org/openpyxl/openpyxl/downloads/
|
||||||
|
$ wget https://bitbucket.org/openpyxl/openpyxl/get/8953233f5af2.zip
|
||||||
|
$ 7z x 8953233f5af2.zip && mv openpyxl-openpyxl-8953233f5af2 openpyxl
|
||||||
|
|
||||||
|
$ cd ~/tools/et_xmlfile
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
$ cd ~/tools/jdcal
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
$ cd ~/tools/openpyxl
|
||||||
|
$ sudo python3 setup.py install
|
||||||
|
```
|
||||||
|
|
||||||
|
Install myems-normalization service:
|
||||||
|
```
|
||||||
|
$ cd ~
|
||||||
|
$ git clone https://github.com/myems/myems.git
|
||||||
|
$ cd myems
|
||||||
|
$ sudo git checkout master (or the latest release tag)
|
||||||
|
$ sudo cp -r ~/myems/myems-normalization /myems-normalization
|
||||||
|
```
|
||||||
|
|
||||||
|
Edit config.py for your project
|
||||||
|
```
|
||||||
|
$ sudo nano /myems-normalization/config.py
|
||||||
|
```
|
||||||
|
|
||||||
|
Setup systemd service:
|
||||||
|
```
|
||||||
|
$ sudo cp myems-normalization.service /lib/systemd/system/
|
||||||
|
```
|
||||||
|
|
||||||
|
Enable the service:
|
||||||
|
```
|
||||||
|
$ sudo systemctl enable feed-normalization.service
|
||||||
|
```
|
||||||
|
|
||||||
|
Start the service:
|
||||||
|
```
|
||||||
|
$ sudo systemctl start feed-normalization.service
|
||||||
|
```
|
||||||
|
|
||||||
|
### References
|
||||||
|
|
||||||
|
1. https://myems.io
|
||||||
|
2. https://dev.mysql.com/doc/connector-python/en/
|
||||||
|
3. https://github.com/sympy/sympy
|
||||||
|
4. https://openpyxl.readthedocs.io
|
|
@ -0,0 +1,43 @@
|
||||||
|
myems_system_db = {
|
||||||
|
'user': 'root',
|
||||||
|
'password': '!MyEMS1',
|
||||||
|
'host': '127.0.0.1',
|
||||||
|
'database': 'myems_system_db',
|
||||||
|
'port': 3306,
|
||||||
|
}
|
||||||
|
|
||||||
|
myems_energy_db = {
|
||||||
|
'user': 'root',
|
||||||
|
'password': '!MyEMS1',
|
||||||
|
'host': '127.0.0.1',
|
||||||
|
'database': 'myems_energy_db',
|
||||||
|
'port': 3306,
|
||||||
|
}
|
||||||
|
|
||||||
|
myems_historical_db = {
|
||||||
|
'user': 'root',
|
||||||
|
'password': '!MyEMS1',
|
||||||
|
'host': '127.0.0.1',
|
||||||
|
'database': 'myems_historical_db',
|
||||||
|
'port': 3306,
|
||||||
|
}
|
||||||
|
|
||||||
|
# indicates in how many minutes to normalize energy consumption
|
||||||
|
# 30 for half hourly
|
||||||
|
# 60 for hourly
|
||||||
|
minutes_to_count = 60
|
||||||
|
|
||||||
|
# indicates within how many minutes to allow myems-cleaning service to clean the historical data
|
||||||
|
minutes_to_clean = 30
|
||||||
|
|
||||||
|
# indicates from when (in UTC timezone) to calculate if the energy data is empty or were cleared
|
||||||
|
# format string: '%Y-%m-%d %H:%M:%S'
|
||||||
|
start_datetime_utc = '2019-12-31 16:00:00'
|
||||||
|
|
||||||
|
# the number of worker processes in parallel for meter and virtual meter
|
||||||
|
# the pool size depends on the computing performance of the database server and the analysis server
|
||||||
|
pool_size = 5
|
||||||
|
|
||||||
|
# indicates the project's time zone offset from UTC
|
||||||
|
utc_offset = '+08:00'
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
import logging
|
||||||
|
from logging.handlers import RotatingFileHandler
|
||||||
|
from multiprocessing import Process
|
||||||
|
import meter
|
||||||
|
import offlinemeter
|
||||||
|
import virtualmeter
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""main"""
|
||||||
|
# create logger
|
||||||
|
logger = logging.getLogger('myems-normalization')
|
||||||
|
# specifies the lowest-severity log message a logger will handle,
|
||||||
|
# where debug is the lowest built-in severity level and critical is the highest built-in severity.
|
||||||
|
# For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL
|
||||||
|
# messages and will ignore DEBUG messages.
|
||||||
|
logger.setLevel(logging.ERROR)
|
||||||
|
# create file handler which logs messages
|
||||||
|
fh = RotatingFileHandler('myems-normalization.log', maxBytes=1024*1024, backupCount=1)
|
||||||
|
# create formatter and add it to the handlers
|
||||||
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||||
|
fh.setFormatter(formatter)
|
||||||
|
# add the handlers to logger
|
||||||
|
logger.addHandler(fh)
|
||||||
|
|
||||||
|
# calculate energy consumption in hourly period
|
||||||
|
Process(target=meter.calculate_hourly, args=(logger,)).start()
|
||||||
|
Process(target=virtualmeter.calculate_hourly, args=(logger,)).start()
|
||||||
|
Process(target=offlinemeter.calculate_hourly, args=(logger,)).start()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
main()
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,430 @@
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta, timezone
|
||||||
|
import mysql.connector
|
||||||
|
from multiprocessing import Pool
|
||||||
|
import random
|
||||||
|
from decimal import Decimal
|
||||||
|
import config
|
||||||
|
|
||||||
|
########################################################################################################################
|
||||||
|
# PROCEDURES:
|
||||||
|
# Step 1: Query all meters and associated energy value points
|
||||||
|
# Step 2: Create multiprocessing pool to call worker in parallel
|
||||||
|
########################################################################################################################
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_hourly(logger):
|
||||||
|
|
||||||
|
while True:
|
||||||
|
################################################################################################################
|
||||||
|
# Step 1: Query all meters and associated energy value points
|
||||||
|
################################################################################################################
|
||||||
|
cnx_system_db = None
|
||||||
|
cursor_system_db = None
|
||||||
|
try:
|
||||||
|
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||||
|
cursor_system_db = cnx_system_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.1 of meter.calculate_hourly process " + str(e))
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
# sleep several minutes and continue the outer loop to reconnect the database
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Connected to the MyEMS System Database")
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor_system_db.execute(" SELECT m.id, m.name, m.hourly_low_limit, m.hourly_high_limit, "
|
||||||
|
" p.id as point_id, p.units "
|
||||||
|
" FROM tbl_meters m, tbl_meters_points mp, tbl_points p "
|
||||||
|
" WHERE m.id = mp.meter_id "
|
||||||
|
" AND mp.point_id = p.id "
|
||||||
|
" AND p.object_type = 'ENERGY_VALUE'")
|
||||||
|
rows_meters = cursor_system_db.fetchall()
|
||||||
|
|
||||||
|
if rows_meters is None or len(rows_meters) == 0:
|
||||||
|
# sleep several minutes and continue the outer loop to reconnect the database
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
meter_list = list()
|
||||||
|
for row in rows_meters:
|
||||||
|
meta_result = {"id": row[0],
|
||||||
|
"name": row[1],
|
||||||
|
"hourly_low_limit": row[2],
|
||||||
|
"hourly_high_limit": row[3],
|
||||||
|
"point_id": row[4],
|
||||||
|
"units": row[5]}
|
||||||
|
|
||||||
|
meter_list.append(meta_result)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.2 meter.calculate_hourly " + str(e))
|
||||||
|
# sleep several minutes and continue the outer loop to reconnect the database
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
|
||||||
|
# shuffle the meter list for randomly calculating the meter hourly value
|
||||||
|
random.shuffle(meter_list)
|
||||||
|
|
||||||
|
print("Got all meters in MyEMS System Database")
|
||||||
|
|
||||||
|
################################################################################################################
|
||||||
|
# Step 2: Create multiprocessing pool to call worker in parallel
|
||||||
|
################################################################################################################
|
||||||
|
p = Pool(processes=config.pool_size)
|
||||||
|
error_list = p.map(worker, meter_list)
|
||||||
|
p.close()
|
||||||
|
p.join()
|
||||||
|
|
||||||
|
for error in error_list:
|
||||||
|
if error is not None and len(error) > 0:
|
||||||
|
logger.error(error)
|
||||||
|
|
||||||
|
print("go to sleep ...")
|
||||||
|
time.sleep(60)
|
||||||
|
print("wake from sleep, and continue to work...")
|
||||||
|
# end of outer while
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################################################
|
||||||
|
# PROCEDURES:
|
||||||
|
# Step 1: Determine the start datetime and end datetime
|
||||||
|
# Step 2: Get raw data from historical database between start_datetime_utc and end datetime
|
||||||
|
# Step 3: Normalize energy values by minutes_to_count
|
||||||
|
# Step 4: Insert into energy database
|
||||||
|
#
|
||||||
|
# NOTE: returns None or the error string because that the logger object cannot be passed in as parameter
|
||||||
|
########################################################################################################################
|
||||||
|
|
||||||
|
def worker(meter):
|
||||||
|
print("Start to process meter: " + "'" + meter['name'] + "'")
|
||||||
|
####################################################################################################################
|
||||||
|
# Step 1: Determine the start datetime and end datetime
|
||||||
|
####################################################################################################################
|
||||||
|
cnx_energy_db = None
|
||||||
|
cursor_energy_db = None
|
||||||
|
try:
|
||||||
|
cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
|
||||||
|
cursor_energy_db = cnx_energy_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
error_string = "Error in step 1.1 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
# get the initial start datetime from config file in case there is no energy data
|
||||||
|
start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
|
||||||
|
start_datetime_utc = start_datetime_utc.replace(tzinfo=timezone.utc)
|
||||||
|
start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0)
|
||||||
|
|
||||||
|
try:
|
||||||
|
query = (" SELECT MAX(start_datetime_utc) "
|
||||||
|
" FROM tbl_meter_hourly "
|
||||||
|
" WHERE meter_id = %s ")
|
||||||
|
cursor_energy_db.execute(query, (meter['id'],))
|
||||||
|
row_datetime = cursor_energy_db.fetchone()
|
||||||
|
except Exception as e:
|
||||||
|
error_string = "Error in step 1.3 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
|
||||||
|
start_datetime_utc = row_datetime[0].replace(tzinfo=timezone.utc)
|
||||||
|
# replace second and microsecond with 0
|
||||||
|
# NOTE: DO NOT replace minute in case of calculating in half hourly
|
||||||
|
start_datetime_utc = start_datetime_utc.replace(second=0, microsecond=0)
|
||||||
|
# start from the next time slot
|
||||||
|
start_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
end_datetime_utc = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||||
|
# we should allow myems-cleaning service to take at most [minutes_to_clean] minutes to clean the data
|
||||||
|
end_datetime_utc -= timedelta(minutes=config.minutes_to_clean)
|
||||||
|
|
||||||
|
time_difference = end_datetime_utc - start_datetime_utc
|
||||||
|
time_difference_in_minutes = time_difference / timedelta(minutes=1)
|
||||||
|
if time_difference_in_minutes < config.minutes_to_count:
|
||||||
|
error_string = "it's too early to calculate" + " for '" + meter['name'] + "'"
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
# trim end_datetime_utc
|
||||||
|
trimmed_end_datetime_utc = start_datetime_utc + timedelta(minutes=config.minutes_to_count)
|
||||||
|
while trimmed_end_datetime_utc <= end_datetime_utc:
|
||||||
|
trimmed_end_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
end_datetime_utc = trimmed_end_datetime_utc - timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
if end_datetime_utc <= start_datetime_utc:
|
||||||
|
error_string = "it's too early to calculate" + " for '" + meter['name'] + "'"
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
|
||||||
|
+ "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# Step 2: Get raw data from historical database between start_datetime_utc and end_datetime_utc
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
cnx_historical_db = None
|
||||||
|
cursor_historical_db = None
|
||||||
|
try:
|
||||||
|
cnx_historical_db = mysql.connector.connect(**config.myems_historical_db)
|
||||||
|
cursor_historical_db = cnx_historical_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
error_string = "Error in step 1.2 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
|
||||||
|
if cursor_historical_db:
|
||||||
|
cursor_historical_db.close()
|
||||||
|
if cnx_historical_db:
|
||||||
|
cnx_historical_db.close()
|
||||||
|
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
# query latest record before start_datetime_utc
|
||||||
|
energy_value_just_before_start = dict()
|
||||||
|
try:
|
||||||
|
query = (" SELECT utc_date_time, actual_value "
|
||||||
|
" FROM tbl_energy_value "
|
||||||
|
" WHERE point_id = %s AND utc_date_time < %s AND is_bad = FALSE "
|
||||||
|
" ORDER BY utc_date_time DESC "
|
||||||
|
" LIMIT 1 ")
|
||||||
|
cursor_historical_db.execute(query, (meter['point_id'], start_datetime_utc,))
|
||||||
|
row_energy_value_before_start = cursor_historical_db.fetchone()
|
||||||
|
|
||||||
|
if row_energy_value_before_start is not None and len(row_energy_value_before_start) > 0:
|
||||||
|
energy_value_just_before_start = {"utc_date_time": row_energy_value_before_start[0],
|
||||||
|
"actual_value": row_energy_value_before_start[1]}
|
||||||
|
except Exception as e:
|
||||||
|
error_string = "Error in step 2.2 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
|
||||||
|
if cursor_historical_db:
|
||||||
|
cursor_historical_db.close()
|
||||||
|
if cnx_historical_db:
|
||||||
|
cnx_historical_db.close()
|
||||||
|
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
# query energy values to be normalized
|
||||||
|
try:
|
||||||
|
query = (" SELECT utc_date_time, actual_value "
|
||||||
|
" FROM tbl_energy_value "
|
||||||
|
" WHERE point_id = %s AND utc_date_time >= %s AND utc_date_time < %s AND is_bad = FALSE "
|
||||||
|
" ORDER BY utc_date_time ")
|
||||||
|
cursor_historical_db.execute(query, (meter['point_id'], start_datetime_utc, end_datetime_utc))
|
||||||
|
rows_energy_values = cursor_historical_db.fetchall()
|
||||||
|
except Exception as e:
|
||||||
|
error_string = "Error in step 2.3 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
|
||||||
|
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
finally:
|
||||||
|
if cursor_historical_db:
|
||||||
|
cursor_historical_db.close()
|
||||||
|
if cnx_historical_db:
|
||||||
|
cnx_historical_db.close()
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# Step 3: Normalize energy values by minutes_to_count
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# special test case 1 (disconnected)
|
||||||
|
# id point_id utc_date_time actual_value
|
||||||
|
# '878152', '3315', '2016-12-05 23:58:46', '38312088'
|
||||||
|
# '878183', '3315', '2016-12-05 23:59:48', '38312088'
|
||||||
|
# '878205', '3315', '2016-12-06 06:14:49', '38315900'
|
||||||
|
# '878281', '3315', '2016-12-06 06:15:50', '38315928'
|
||||||
|
# '878357', '3315', '2016-12-06 06:16:52', '38315928'
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# special test case 2 (a new added used meter)
|
||||||
|
# id, point_id, utc_date_time, actual_value
|
||||||
|
# '19070111', '1734', '2017-03-27 02:36:07', '56842220.77297248'
|
||||||
|
# '19069943', '1734', '2017-03-27 02:35:04', '56842208.420127675'
|
||||||
|
# '19069775', '1734', '2017-03-27 02:34:01', '56842195.95270827'
|
||||||
|
# '19069608', '1734', '2017-03-27 02:32:58', '56842183.48610827'
|
||||||
|
# '19069439', '1734', '2017-03-27 02:31:53', '56842170.812365524'
|
||||||
|
# '19069270', '1734', '2017-03-27 02:30:48', '56842157.90797222'
|
||||||
|
# null, null, null, , null
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# special test case 3 (hi_limit exceeded)
|
||||||
|
# id point_id utc_date_time actual_value
|
||||||
|
# '3230282', '3336', '2016-12-24 08:26:14', '999984.0625'
|
||||||
|
# '3230401', '3336', '2016-12-24 08:27:15', '999984.0625'
|
||||||
|
# '3230519', '3336', '2016-12-24 08:28:17', '999984.0625'
|
||||||
|
# '3230638', '3336', '2016-12-24 08:29:18', '20'
|
||||||
|
# '3230758', '3336', '2016-12-24 08:30:20', '20'
|
||||||
|
# '3230878', '3336', '2016-12-24 08:31:21', '20'
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# test case 4 (recovered from bad zeroes)
|
||||||
|
# id point_id utc_date_time actual_value is_bad
|
||||||
|
# 300366736 1003344 2019-03-14 02:03:20 1103860.625
|
||||||
|
# 300366195 1003344 2019-03-14 02:02:19 1103845
|
||||||
|
# 300365654 1003344 2019-03-14 02:01:19 1103825.5
|
||||||
|
# 300365106 1003344 2019-03-14 02:00:18 1103804.25
|
||||||
|
# 300364562 1003344 2019-03-14 01:59:17 1103785.625
|
||||||
|
# 300364021 1003344 2019-03-14 01:58:17 1103770.875
|
||||||
|
# 300363478 1003344 2019-03-14 01:57:16 1103755.125
|
||||||
|
# 300362936 1003344 2019-03-14 01:56:16 1103739.375
|
||||||
|
# 300362393 1003344 2019-03-14 01:55:15 1103720.625
|
||||||
|
# 300361851 1003344 2019-03-14 01:54:15 1103698.125
|
||||||
|
# 300361305 1003344 2019-03-14 01:53:14 1103674.75
|
||||||
|
# 300360764 1003344 2019-03-14 01:52:14 1103649
|
||||||
|
# 300360221 1003344 2019-03-14 01:51:13 1103628.25
|
||||||
|
# 300359676 1003344 2019-03-14 01:50:13 1103608.625
|
||||||
|
# 300359133 1003344 2019-03-14 01:49:12 1103586.75
|
||||||
|
# 300358592 1003344 2019-03-14 01:48:12 1103564
|
||||||
|
# 300358050 1003344 2019-03-14 01:47:11 1103542
|
||||||
|
# 300357509 1003344 2019-03-14 01:46:11 1103520.625
|
||||||
|
# 300356966 1003344 2019-03-14 01:45:10 1103499.375
|
||||||
|
# 300356509 1003344 2019-03-14 01:44:10 1103478.25
|
||||||
|
# 300355964 1003344 2019-03-14 01:43:09 1103456.25
|
||||||
|
# 300355419 1003344 2019-03-14 01:42:09 1103435.5
|
||||||
|
# 300354878 1003344 2019-03-14 01:41:08 1103414.625
|
||||||
|
# 300354335 1003344 2019-03-14 01:40:08 1103391.875
|
||||||
|
# 300353793 1003344 2019-03-14 01:39:07 1103373
|
||||||
|
# 300353248 1003344 2019-03-14 01:38:07 1103349
|
||||||
|
# 300352705 1003344 2019-03-14 01:37:06 1103325.75
|
||||||
|
# 300352163 1003344 2019-03-14 01:36:06 0 1
|
||||||
|
# 300351621 1003344 2019-03-14 01:35:05 0 1
|
||||||
|
# 300351080 1003344 2019-03-14 01:34:05 0 1
|
||||||
|
# 300350532 1003344 2019-03-14 01:33:04 0 1
|
||||||
|
# 300349988 1003344 2019-03-14 01:32:04 0 1
|
||||||
|
# 300349446 1003344 2019-03-14 01:31:03 0 1
|
||||||
|
# 300348903 1003344 2019-03-14 01:30:02 0 1
|
||||||
|
# 300348359 1003344 2019-03-14 01:29:02 0 1
|
||||||
|
# 300347819 1003344 2019-03-14 01:28:01 0 1
|
||||||
|
# 300347277 1003344 2019-03-14 01:27:01 0 1
|
||||||
|
# 300346733 1003344 2019-03-14 01:26:00 0 1
|
||||||
|
# 300346191 1003344 2019-03-14 01:25:00 0 1
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
normalized_values = list()
|
||||||
|
if rows_energy_values is None or len(rows_energy_values) == 0:
|
||||||
|
# NOTE: there isn't any value to be normalized
|
||||||
|
# that means the meter is offline or all values are bad
|
||||||
|
current_datetime_utc = start_datetime_utc
|
||||||
|
while current_datetime_utc < end_datetime_utc:
|
||||||
|
normalized_values.append({'start_datetime_utc': current_datetime_utc, 'actual_value': Decimal(0.0)})
|
||||||
|
current_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
else:
|
||||||
|
maximum = Decimal(0.0)
|
||||||
|
if energy_value_just_before_start is not None and \
|
||||||
|
len(energy_value_just_before_start) > 0 and \
|
||||||
|
energy_value_just_before_start['actual_value'] > Decimal(0.0):
|
||||||
|
maximum = energy_value_just_before_start['actual_value']
|
||||||
|
|
||||||
|
current_datetime_utc = start_datetime_utc
|
||||||
|
while current_datetime_utc < end_datetime_utc:
|
||||||
|
initial_maximum = maximum
|
||||||
|
# get all energy values in current time slot
|
||||||
|
current_energy_values = list()
|
||||||
|
while len(rows_energy_values) > 0:
|
||||||
|
row_energy_value = rows_energy_values.pop(0)
|
||||||
|
energy_value_datetime = row_energy_value[0].replace(tzinfo=timezone.utc)
|
||||||
|
if energy_value_datetime < current_datetime_utc + timedelta(minutes=config.minutes_to_count):
|
||||||
|
current_energy_values.append(row_energy_value)
|
||||||
|
else:
|
||||||
|
rows_energy_values.insert(0, row_energy_value)
|
||||||
|
break
|
||||||
|
|
||||||
|
# get the energy increment one by one in current time slot
|
||||||
|
increment = Decimal(0.0)
|
||||||
|
# maximum should be equal to the maximum value of last time here
|
||||||
|
for index in range(len(current_energy_values)):
|
||||||
|
current_energy_value = current_energy_values[index]
|
||||||
|
if maximum < current_energy_value[1]:
|
||||||
|
increment += current_energy_value[1] - maximum
|
||||||
|
maximum = current_energy_value[1]
|
||||||
|
|
||||||
|
# omit huge initial value for a new meter
|
||||||
|
# or omit huge value for a recovered meter with zero values during failure
|
||||||
|
# NOTE: this method may cause the lose of energy consumption in this time slot
|
||||||
|
if initial_maximum <= Decimal(0.1):
|
||||||
|
increment = Decimal(0.0)
|
||||||
|
|
||||||
|
# check with hourly low limit
|
||||||
|
if increment < meter['hourly_low_limit']:
|
||||||
|
increment = Decimal(0.0)
|
||||||
|
|
||||||
|
# check with hourly high limit
|
||||||
|
# NOTE: this method may cause the lose of energy consumption in this time slot
|
||||||
|
if increment > meter['hourly_high_limit']:
|
||||||
|
increment = Decimal(0.0)
|
||||||
|
|
||||||
|
meta_data = {'start_datetime_utc': current_datetime_utc,
|
||||||
|
'actual_value': increment}
|
||||||
|
# append mete_data
|
||||||
|
normalized_values.append(meta_data)
|
||||||
|
current_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# Step 4: Insert into energy database
|
||||||
|
####################################################################################################################
|
||||||
|
if len(normalized_values) > 0:
|
||||||
|
try:
|
||||||
|
add_values = (" INSERT INTO tbl_meter_hourly (meter_id, start_datetime_utc, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
|
||||||
|
for meta_data in normalized_values:
|
||||||
|
add_values += " (" + str(meter['id']) + ","
|
||||||
|
add_values += "'" + meta_data['start_datetime_utc'].isoformat()[0:19] + "',"
|
||||||
|
add_values += str(meta_data['actual_value']) + "), "
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_energy_db.execute(add_values[:-2])
|
||||||
|
cnx_energy_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
error_string = "Error in step 4.1 of meter.worker " + str(e) + " for '" + meter['name'] + "'"
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
|
||||||
|
print(error_string)
|
||||||
|
return error_string
|
||||||
|
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
|
||||||
|
print("End of processing meter: " + "'" + meter['name'] + "'")
|
||||||
|
return None
|
|
@ -0,0 +1,15 @@
|
||||||
|
[Unit]
|
||||||
|
Description=myems-normalization daemon
|
||||||
|
After=network.target
|
||||||
|
|
||||||
|
[Service]
|
||||||
|
User=root
|
||||||
|
Group=root
|
||||||
|
ExecStart=/usr/bin/python3 /myems-normalization/main.py
|
||||||
|
ExecReload=/bin/kill -s HUP $MAINPID
|
||||||
|
ExecStop=/bin/kill -s TERM $MAINPID
|
||||||
|
PrivateTmp=true
|
||||||
|
Restart=always
|
||||||
|
|
||||||
|
[Install]
|
||||||
|
WantedBy=multi-user.target
|
Binary file not shown.
|
@ -0,0 +1,302 @@
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import mysql.connector
|
||||||
|
from openpyxl import load_workbook
|
||||||
|
import config
|
||||||
|
|
||||||
|
################################################################################################################
|
||||||
|
# PROCEDURES:
|
||||||
|
# STEP 1: get all 'new' offline meter files
|
||||||
|
# STEP 2: for each new files, iterate all rows and read cell's value and store data to energy data list
|
||||||
|
# STEP 3: insert or update energy data to table offline meter hourly in energy database
|
||||||
|
# STEP 4: update file status to 'done' or 'error'
|
||||||
|
################################################################################################################
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_hourly(logger):
|
||||||
|
while True:
|
||||||
|
# outer loop to reconnect server if there is a connection error
|
||||||
|
################################################################################################################
|
||||||
|
# STEP 1: get all 'new' offline meter files
|
||||||
|
################################################################################################################
|
||||||
|
cnx = None
|
||||||
|
cursor = None
|
||||||
|
try:
|
||||||
|
cnx = mysql.connector.connect(**config.myems_historical_db)
|
||||||
|
cursor = cnx.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.1 of offline meter.calculate_hourly " + str(e))
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
# sleep several minutes and continue the outer loop to reconnect the database
|
||||||
|
print("Could not connect the MyEMS Historical Database, and go to sleep 60 seconds...")
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Connected to MyEMS Historical Database")
|
||||||
|
|
||||||
|
print("Getting all new offline meter files")
|
||||||
|
try:
|
||||||
|
query = (" SELECT id, file_name, file_object "
|
||||||
|
" FROM tbl_offline_meter_files "
|
||||||
|
" WHERE status = 'new' "
|
||||||
|
" ORDER BY id ")
|
||||||
|
|
||||||
|
cursor.execute(query, )
|
||||||
|
rows_files = cursor.fetchall()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1.2 of offline meter.calculate_hourly " + str(e))
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
|
||||||
|
excel_file_list = list()
|
||||||
|
if rows_files is not None and len(rows_files) > 0:
|
||||||
|
for row_file in rows_files:
|
||||||
|
excel_file_list.append({"id": row_file[0],
|
||||||
|
"name": row_file[1],
|
||||||
|
"file_object": row_file[2]})
|
||||||
|
else:
|
||||||
|
print("there isn't any new files found, and go to sleep 60 seconds...")
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
################################################################################################################
|
||||||
|
# STEP 2: for each new files, dump file object to local file and then load workbook from the local file
|
||||||
|
################################################################################################################
|
||||||
|
for excel_file in excel_file_list:
|
||||||
|
print("read data from each offline meter file" + excel_file['name'])
|
||||||
|
is_valid_file = True
|
||||||
|
fw = None
|
||||||
|
try:
|
||||||
|
fw = open("myems-normalization.blob", 'wb')
|
||||||
|
fw.write(excel_file['file_object'])
|
||||||
|
fw.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 2.1 of offline meter.calculate_hourly " + str(e))
|
||||||
|
if fw:
|
||||||
|
fw.close()
|
||||||
|
# mark as invalid file
|
||||||
|
is_valid_file = False
|
||||||
|
|
||||||
|
fr = None
|
||||||
|
wb = None
|
||||||
|
try:
|
||||||
|
fr = open("myems-normalization.blob", 'rb')
|
||||||
|
wb = load_workbook(fr, data_only=True)
|
||||||
|
fr.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 2.2 of offline meter.calculate_hourly " + str(e))
|
||||||
|
if fr:
|
||||||
|
fr.close()
|
||||||
|
# mark as invalid file
|
||||||
|
is_valid_file = False
|
||||||
|
|
||||||
|
energy_data_list = list()
|
||||||
|
# grab the active worksheet
|
||||||
|
|
||||||
|
if is_valid_file:
|
||||||
|
ws = wb.active
|
||||||
|
|
||||||
|
# get timezone offset in minutes, this value will be returned to client
|
||||||
|
timezone_offset = int(config.utc_offset[1:3]) * 60 + int(config.utc_offset[4:6])
|
||||||
|
if config.utc_offset[0] == '-':
|
||||||
|
timezone_offset = -timezone_offset
|
||||||
|
|
||||||
|
for row in ws.iter_rows(min_row=3, max_row=1024, min_col=1, max_col=34):
|
||||||
|
offline_meter_data = dict()
|
||||||
|
offline_meter_data['offline_meter_id'] = None
|
||||||
|
offline_meter_data['offline_meter_name'] = None
|
||||||
|
offline_meter_data['data'] = dict()
|
||||||
|
col_num = 0
|
||||||
|
|
||||||
|
for cell in row:
|
||||||
|
col_num += 1
|
||||||
|
print(cell.value)
|
||||||
|
if col_num == 1:
|
||||||
|
# get offline meter ID
|
||||||
|
if cell.value is not None:
|
||||||
|
offline_meter_data['offline_meter_id'] = cell.value
|
||||||
|
else:
|
||||||
|
break
|
||||||
|
elif col_num == 2:
|
||||||
|
# get offline meter name
|
||||||
|
if cell.value is None:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
offline_meter_data['offline_meter_name'] = cell.value
|
||||||
|
elif col_num > 3:
|
||||||
|
# get date of the cell
|
||||||
|
try:
|
||||||
|
offline_datetime = datetime(year=ws['A2'].value,
|
||||||
|
month=ws['B2'].value,
|
||||||
|
day=col_num - 3)
|
||||||
|
except ValueError:
|
||||||
|
# invalid date and go to next cell in this row until reach max_col
|
||||||
|
continue
|
||||||
|
|
||||||
|
offline_datetime_utc = offline_datetime - timedelta(minutes=timezone_offset)
|
||||||
|
|
||||||
|
if cell.value is None:
|
||||||
|
# if the cell is empty then stop at that day
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
offline_meter_data['data'][offline_datetime_utc] = cell.value
|
||||||
|
|
||||||
|
if len(offline_meter_data['data']) > 0:
|
||||||
|
print("offline_meter_data:" + str(offline_meter_data))
|
||||||
|
energy_data_list.append(offline_meter_data)
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# STEP 3: insert or update energy data to table offline meter hourly in energy database
|
||||||
|
############################################################################################################
|
||||||
|
print("to valid offline meter id in excel file...")
|
||||||
|
if len(energy_data_list) == 0:
|
||||||
|
print("Could not find any offline meters in the file...")
|
||||||
|
print("and go to process the next file...")
|
||||||
|
is_valid_file = False
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
cnx = mysql.connector.connect(**config.myems_system_db)
|
||||||
|
cursor = cnx.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 3.1 of offlinemeter.calculate_hourly " + str(e))
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
cursor.execute(" SELECT id, name "
|
||||||
|
" FROM tbl_offline_meters ")
|
||||||
|
rows_offline_meters = cursor.fetchall()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 3.2 of offlinemeter.calculate_hourly " + str(e))
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
|
||||||
|
if rows_offline_meters is None or len(rows_offline_meters) == 0:
|
||||||
|
print("Could not find any offline meters in the MyEMS System Database...")
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
offline_meter_id_set = set()
|
||||||
|
for row_offline_meter in rows_offline_meters:
|
||||||
|
# valid offline meter id in excel file
|
||||||
|
offline_meter_id_set.add(row_offline_meter[0])
|
||||||
|
|
||||||
|
for energy_data_item in energy_data_list:
|
||||||
|
if energy_data_item['offline_meter_id'] not in offline_meter_id_set:
|
||||||
|
is_valid_file = False
|
||||||
|
break
|
||||||
|
|
||||||
|
if is_valid_file:
|
||||||
|
####################################################################################################
|
||||||
|
# delete possibly exists offline meter hourly data in myems energy database,
|
||||||
|
# and then insert new offline meter hourly data
|
||||||
|
####################################################################################################
|
||||||
|
try:
|
||||||
|
cnx = mysql.connector.connect(**config.myems_energy_db)
|
||||||
|
cursor = cnx.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 3.2 of offlinemeter.calculate_hourly " + str(e))
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
for energy_data_item in energy_data_list:
|
||||||
|
offline_meter_id = energy_data_item['offline_meter_id']
|
||||||
|
print(energy_data_item['data'].items())
|
||||||
|
for k, v in energy_data_item['data'].items():
|
||||||
|
start_datetime_utc = k
|
||||||
|
end_datetime_utc = start_datetime_utc + timedelta(hours=24)
|
||||||
|
actual_value = v / (24 * 60 / config.minutes_to_count)
|
||||||
|
cursor.execute(" DELETE FROM tbl_offline_meter_hourly "
|
||||||
|
" WHERE offline_meter_id = %s "
|
||||||
|
" AND start_datetime_utc >= %s "
|
||||||
|
" AND start_datetime_utc < %s ",
|
||||||
|
(offline_meter_id,
|
||||||
|
start_datetime_utc.isoformat()[0:19],
|
||||||
|
end_datetime_utc.isoformat()[0:19]))
|
||||||
|
cnx.commit()
|
||||||
|
# todo: check with hourly low limit and hourly high limit
|
||||||
|
add_values = (" INSERT INTO tbl_offline_meter_hourly "
|
||||||
|
" (offline_meter_id, start_datetime_utc, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
|
||||||
|
while start_datetime_utc < end_datetime_utc:
|
||||||
|
add_values += " (" + str(offline_meter_id) + ","
|
||||||
|
add_values += "'" + start_datetime_utc.isoformat()[0:19] + "',"
|
||||||
|
add_values += str(actual_value) + "), "
|
||||||
|
start_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
print("add_values:" + add_values)
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor.execute(add_values[:-2])
|
||||||
|
cnx.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 3.3 of offlinemeter.calculate_hourly " + str(e))
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# STEP 4: update file status to 'done' or 'error'
|
||||||
|
############################################################################################################
|
||||||
|
print("to update offline meter file status to done...")
|
||||||
|
try:
|
||||||
|
cnx = mysql.connector.connect(**config.myems_historical_db)
|
||||||
|
cursor = cnx.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.1 of offlinemeter.calculate_hourly " + str(e))
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
update_row = (" UPDATE tbl_offline_meter_files "
|
||||||
|
" SET status = %s "
|
||||||
|
" WHERE id = %s ")
|
||||||
|
cursor.execute(update_row, ('done' if is_valid_file else 'error', excel_file['id'],))
|
||||||
|
cnx.commit()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 4.2 of offlinemeter.calculate_hourly " + str(e))
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if cursor:
|
||||||
|
cursor.close()
|
||||||
|
if cnx:
|
||||||
|
cnx.close()
|
||||||
|
|
||||||
|
# end of for excel_file in excel_file_list
|
||||||
|
|
||||||
|
print("go to sleep ...")
|
||||||
|
time.sleep(300)
|
||||||
|
print("wake from sleep, and go to work...")
|
||||||
|
# end of outer while
|
||||||
|
|
|
@ -0,0 +1,475 @@
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
import mysql.connector
|
||||||
|
from sympy import sympify
|
||||||
|
from multiprocessing import Pool
|
||||||
|
import random
|
||||||
|
import config
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################################################
|
||||||
|
# PROCEDURES:
|
||||||
|
# Step 1: Query all virtual meters
|
||||||
|
# Step 2: Create multiprocessing pool to call worker in parallel
|
||||||
|
########################################################################################################################
|
||||||
|
|
||||||
|
def calculate_hourly(logger):
|
||||||
|
|
||||||
|
while True:
|
||||||
|
# outer loop to reconnect server if there is a connection error
|
||||||
|
cnx_system_db = None
|
||||||
|
cursor_system_db = None
|
||||||
|
try:
|
||||||
|
cnx_system_db = mysql.connector.connect(**config.myems_system_db)
|
||||||
|
cursor_system_db = cnx_system_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 0 of virtual_meter.calculate_hourly " + str(e))
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
# sleep and continue the outer loop to reconnect the database
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
print("Connected to MyEMS System Database")
|
||||||
|
|
||||||
|
virtual_meter_list = list()
|
||||||
|
try:
|
||||||
|
cursor_system_db.execute(" SELECT m.id, m.name, e.equation, e.id as expression_id "
|
||||||
|
" FROM tbl_virtual_meters m, tbl_expressions e "
|
||||||
|
" WHERE m.id = e.virtual_meter_id "
|
||||||
|
" ORDER BY m.id ")
|
||||||
|
rows_virtual_meters = cursor_system_db.fetchall()
|
||||||
|
|
||||||
|
if rows_virtual_meters is None or len(rows_virtual_meters) == 0:
|
||||||
|
# sleep several minutes and continue the outer loop to reconnect the database
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
|
||||||
|
for row in rows_virtual_meters:
|
||||||
|
meta_result = {"id": row[0], "name": row[1], "equation": row[2], "expression_id": row[3]}
|
||||||
|
virtual_meter_list.append(meta_result)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error("Error in step 1 of virtual meter calculate hourly " + str(e))
|
||||||
|
# sleep and continue the outer loop to reconnect the database
|
||||||
|
time.sleep(60)
|
||||||
|
continue
|
||||||
|
finally:
|
||||||
|
if cursor_system_db:
|
||||||
|
cursor_system_db.close()
|
||||||
|
if cnx_system_db:
|
||||||
|
cnx_system_db.close()
|
||||||
|
|
||||||
|
# shuffle the virtual meter list for randomly calculating the meter hourly value
|
||||||
|
random.shuffle(virtual_meter_list)
|
||||||
|
|
||||||
|
print("Got all virtual meters in MyEMS System Database")
|
||||||
|
################################################################################################################
|
||||||
|
# Step 2: Create multiprocessing pool to call worker in parallel
|
||||||
|
################################################################################################################
|
||||||
|
p = Pool(processes=config.pool_size)
|
||||||
|
error_list = p.map(worker, virtual_meter_list)
|
||||||
|
p.close()
|
||||||
|
p.join()
|
||||||
|
|
||||||
|
for error in error_list:
|
||||||
|
if error is not None and len(error) > 0:
|
||||||
|
logger.error(error)
|
||||||
|
|
||||||
|
print("go to sleep ...")
|
||||||
|
time.sleep(60)
|
||||||
|
print("wake from sleep, and continue to work...")
|
||||||
|
|
||||||
|
|
||||||
|
########################################################################################################################
|
||||||
|
# Step 1: get start datetime and end datetime
|
||||||
|
# Step 2: parse the expression and get all meters, virtual meters, offline meters associated with the expression
|
||||||
|
# Step 3: query energy consumption values from table meter hourly, virtual meter hourly and offline meter hourly
|
||||||
|
# Step 4: evaluate the equation with variables values from previous step and save to table virtual meter hourly
|
||||||
|
# returns the error string for logging or returns None
|
||||||
|
########################################################################################################################
|
||||||
|
|
||||||
|
def worker(virtual_meter):
|
||||||
|
cnx_energy_db = None
|
||||||
|
cursor_energy_db = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
cnx_energy_db = mysql.connector.connect(**config.myems_energy_db)
|
||||||
|
cursor_energy_db = cnx_energy_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 1.1 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
print("Start to process virtual meter: " + "'" + virtual_meter['name']+"'")
|
||||||
|
|
||||||
|
####################################################################################################################
|
||||||
|
# step 1: get start datetime and end datetime
|
||||||
|
# get latest timestamp from energy database in tbl_virtual_meter_hourly
|
||||||
|
####################################################################################################################
|
||||||
|
|
||||||
|
try:
|
||||||
|
query = (" SELECT MAX(start_datetime_utc) "
|
||||||
|
" FROM tbl_virtual_meter_hourly "
|
||||||
|
" WHERE virtual_meter_id = %s ")
|
||||||
|
cursor_energy_db.execute(query, (virtual_meter['id'],))
|
||||||
|
row_datetime = cursor_energy_db.fetchone()
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 1.2 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
start_datetime_utc = datetime.strptime(config.start_datetime_utc, '%Y-%m-%d %H:%M:%S')
|
||||||
|
start_datetime_utc = start_datetime_utc.replace(minute=0, second=0, microsecond=0, tzinfo=None)
|
||||||
|
|
||||||
|
if row_datetime is not None and len(row_datetime) > 0 and isinstance(row_datetime[0], datetime):
|
||||||
|
# replace second and microsecond with 0
|
||||||
|
# note: do not replace minute in case of calculating in half hourly
|
||||||
|
start_datetime_utc = row_datetime[0].replace(second=0, microsecond=0, tzinfo=None)
|
||||||
|
# start from the next time slot
|
||||||
|
start_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
end_datetime_utc = datetime.utcnow().replace()
|
||||||
|
end_datetime_utc = end_datetime_utc.replace(second=0, microsecond=0, tzinfo=None)
|
||||||
|
|
||||||
|
time_difference = end_datetime_utc - start_datetime_utc
|
||||||
|
time_difference_in_minutes = time_difference / timedelta(minutes=1)
|
||||||
|
if time_difference_in_minutes < config.minutes_to_count:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "it's too early to calculate" + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
# trim end_datetime_utc
|
||||||
|
trimmed_end_datetime_utc = start_datetime_utc + timedelta(minutes=config.minutes_to_count)
|
||||||
|
while trimmed_end_datetime_utc <= end_datetime_utc:
|
||||||
|
trimmed_end_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
end_datetime_utc = trimmed_end_datetime_utc - timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
if end_datetime_utc <= start_datetime_utc:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "it's too early to calculate" + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
print("start_datetime_utc: " + start_datetime_utc.isoformat()[0:19]
|
||||||
|
+ "end_datetime_utc: " + end_datetime_utc.isoformat()[0:19])
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# Step 2: parse the expression and get all meters, virtual meters, and
|
||||||
|
# offline meters associated with the expression
|
||||||
|
############################################################################################################
|
||||||
|
cnx_factory_db = None
|
||||||
|
cursor_factory_db = None
|
||||||
|
try:
|
||||||
|
cnx_factory_db = mysql.connector.connect(**config.myems_system_db)
|
||||||
|
cursor_factory_db = cnx_factory_db.cursor()
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_factory_db:
|
||||||
|
cursor_factory_db.close()
|
||||||
|
if cnx_factory_db:
|
||||||
|
cnx_factory_db.close()
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 2.1 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
meter_list_in_expression = list()
|
||||||
|
virtual_meter_list_in_expression = list()
|
||||||
|
offline_meter_list_in_expression = list()
|
||||||
|
try:
|
||||||
|
########################################################################################################
|
||||||
|
# get all meters associated with the expression
|
||||||
|
########################################################################################################
|
||||||
|
|
||||||
|
cursor_factory_db.execute(" SELECT m.id as meter_id, v.name as variable_name "
|
||||||
|
" FROM tbl_meters m, tbl_variables v "
|
||||||
|
" WHERE m.id = v.meter_id "
|
||||||
|
" AND v.meter_type = 'meter' "
|
||||||
|
" AND v.expression_id = %s ",
|
||||||
|
(virtual_meter['expression_id'], ))
|
||||||
|
rows = cursor_factory_db.fetchall()
|
||||||
|
if rows is not None and len(rows) > 0:
|
||||||
|
for row in rows:
|
||||||
|
meter_list_in_expression.append({"meter_id": row[0], "variable_name": row[1].lower()})
|
||||||
|
|
||||||
|
########################################################################################################
|
||||||
|
# get all virtual meters associated with the expression
|
||||||
|
########################################################################################################
|
||||||
|
|
||||||
|
cursor_factory_db.execute(" SELECT m.id as virtual_meter_id, v.name as variable_name "
|
||||||
|
" FROM tbl_virtual_meters m, tbl_variables v "
|
||||||
|
" WHERE m.id = v.meter_id "
|
||||||
|
" AND v.meter_type = 'virtual_meter' "
|
||||||
|
" AND v.expression_id = %s ",
|
||||||
|
(virtual_meter['expression_id'],))
|
||||||
|
rows = cursor_factory_db.fetchall()
|
||||||
|
if rows is not None and len(rows) > 0:
|
||||||
|
for row in rows:
|
||||||
|
virtual_meter_list_in_expression.append({"virtual_meter_id": row[0],
|
||||||
|
"variable_name": row[1].lower()})
|
||||||
|
|
||||||
|
########################################################################################################
|
||||||
|
# get all offline meters associated with the expression
|
||||||
|
########################################################################################################
|
||||||
|
|
||||||
|
cursor_factory_db.execute(" SELECT m.id as offline_meter_id, v.name as variable_name "
|
||||||
|
" FROM tbl_offline_meters m, tbl_variables v "
|
||||||
|
" WHERE m.id = v.meter_id "
|
||||||
|
" AND v.meter_type = 'offline_meter' "
|
||||||
|
" AND v.expression_id = %s ",
|
||||||
|
(virtual_meter['expression_id'],))
|
||||||
|
rows = cursor_factory_db.fetchall()
|
||||||
|
if rows is not None and len(rows) > 0:
|
||||||
|
for row in rows:
|
||||||
|
offline_meter_list_in_expression.append({"offline_meter_id": row[0],
|
||||||
|
"variable_name": row[1].lower()})
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 2.2 of virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
finally:
|
||||||
|
if cursor_factory_db:
|
||||||
|
cursor_factory_db.close()
|
||||||
|
if cnx_factory_db:
|
||||||
|
cnx_factory_db.close()
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# Step 3: query energy consumption values from table meter hourly, virtual meter hourly
|
||||||
|
# and offline meter hourly
|
||||||
|
############################################################################################################
|
||||||
|
|
||||||
|
print("getting energy consumption values from myems_energy_db.tbl_meter_hourly...")
|
||||||
|
energy_meter_hourly = dict()
|
||||||
|
if meter_list_in_expression is not None and len(meter_list_in_expression) > 0:
|
||||||
|
try:
|
||||||
|
for meter_in_expression in meter_list_in_expression:
|
||||||
|
meter_id = str(meter_in_expression['meter_id'])
|
||||||
|
query = (" SELECT start_datetime_utc, actual_value "
|
||||||
|
" FROM tbl_meter_hourly "
|
||||||
|
" WHERE meter_id = %s AND start_datetime_utc >= %s AND start_datetime_utc < %s "
|
||||||
|
" ORDER BY start_datetime_utc ")
|
||||||
|
cursor_energy_db.execute(query, (meter_id, start_datetime_utc, end_datetime_utc, ))
|
||||||
|
rows_energy_values = cursor_energy_db.fetchall()
|
||||||
|
if rows_energy_values is None or len(rows_energy_values) == 0:
|
||||||
|
energy_meter_hourly[meter_id] = None
|
||||||
|
else:
|
||||||
|
energy_meter_hourly[meter_id] = dict()
|
||||||
|
for row_energy_value in rows_energy_values:
|
||||||
|
energy_meter_hourly[meter_id][row_energy_value[0]] = row_energy_value[1]
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 3.2 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
print("getting energy consumption values from myems_energy_db.tbl_virtual_meter_hourly...")
|
||||||
|
energy_virtual_meter_hourly = dict()
|
||||||
|
if virtual_meter_list_in_expression is not None and len(virtual_meter_list_in_expression) > 0:
|
||||||
|
try:
|
||||||
|
for virtual_meter_in_expression in virtual_meter_list_in_expression:
|
||||||
|
virtual_meter_id = str(virtual_meter_in_expression['virtual_meter_id'])
|
||||||
|
query = (" SELECT start_datetime_utc, actual_value "
|
||||||
|
" FROM tbl_virtual_meter_hourly "
|
||||||
|
" WHERE virtual_meter_id = %s "
|
||||||
|
" AND start_datetime_utc >= %s AND start_datetime_utc < %s "
|
||||||
|
" ORDER BY start_datetime_utc ")
|
||||||
|
cursor_energy_db.execute(query, (virtual_meter_id, start_datetime_utc, end_datetime_utc,))
|
||||||
|
rows_energy_values = cursor_energy_db.fetchall()
|
||||||
|
if rows_energy_values is None or len(rows_energy_values) == 0:
|
||||||
|
energy_virtual_meter_hourly[virtual_meter_id] = None
|
||||||
|
else:
|
||||||
|
energy_virtual_meter_hourly[virtual_meter_id] = dict()
|
||||||
|
for row_energy_value in rows_energy_values:
|
||||||
|
energy_virtual_meter_hourly[virtual_meter_id][row_energy_value[0]] = row_energy_value[1]
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 3.3 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
print("getting energy consumption values from myems_energy_db.tbl_offline_meter_hourly...")
|
||||||
|
energy_offline_meter_hourly = dict()
|
||||||
|
if offline_meter_list_in_expression is not None and len(offline_meter_list_in_expression) > 0:
|
||||||
|
try:
|
||||||
|
for offline_meter_in_expression in offline_meter_list_in_expression:
|
||||||
|
offline_meter_id = str(offline_meter_in_expression['offline_meter_id'])
|
||||||
|
query = (" SELECT start_datetime_utc, actual_value "
|
||||||
|
" FROM tbl_offline_meter_hourly "
|
||||||
|
" WHERE offline_meter_id = %s "
|
||||||
|
" AND start_datetime_utc >= %s AND start_datetime_utc < %s "
|
||||||
|
" ORDER BY start_datetime_utc ")
|
||||||
|
cursor_energy_db.execute(query, (offline_meter_id, start_datetime_utc, end_datetime_utc,))
|
||||||
|
rows_energy_values = cursor_energy_db.fetchall()
|
||||||
|
if rows_energy_values is None or len(rows_energy_values) == 0:
|
||||||
|
energy_offline_meter_hourly[offline_meter_id] = None
|
||||||
|
else:
|
||||||
|
energy_offline_meter_hourly[offline_meter_id] = dict()
|
||||||
|
for row_energy_value in rows_energy_values:
|
||||||
|
energy_offline_meter_hourly[offline_meter_id][row_energy_value[0]] = row_energy_value[1]
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 3.4 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# Step 4: evaluate the equation with variables values from previous step
|
||||||
|
# and save to table virtual meter hourly
|
||||||
|
############################################################################################################
|
||||||
|
|
||||||
|
print("getting common time slot of energy values for all meters...")
|
||||||
|
common_start_datetime_utc = start_datetime_utc
|
||||||
|
common_end_datetime_utc = end_datetime_utc
|
||||||
|
if energy_meter_hourly is not None and len(energy_meter_hourly) > 0:
|
||||||
|
for meter_id, energy_hourly in energy_meter_hourly.items():
|
||||||
|
if energy_hourly is None or len(energy_hourly) == 0:
|
||||||
|
common_start_datetime_utc = None
|
||||||
|
common_end_datetime_utc = None
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if common_start_datetime_utc < min(energy_hourly.keys()):
|
||||||
|
common_start_datetime_utc = min(energy_hourly.keys())
|
||||||
|
if common_end_datetime_utc > max(energy_hourly.keys()):
|
||||||
|
common_end_datetime_utc = max(energy_hourly.keys())
|
||||||
|
|
||||||
|
print("getting common time slot of energy values for all virtual meters...")
|
||||||
|
if common_start_datetime_utc is not None and common_start_datetime_utc is not None:
|
||||||
|
if energy_virtual_meter_hourly is not None and len(energy_virtual_meter_hourly) > 0:
|
||||||
|
for meter_id, energy_hourly in energy_virtual_meter_hourly.items():
|
||||||
|
if energy_hourly is None or len(energy_hourly) == 0:
|
||||||
|
common_start_datetime_utc = None
|
||||||
|
common_end_datetime_utc = None
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if common_start_datetime_utc < min(energy_hourly.keys()):
|
||||||
|
common_start_datetime_utc = min(energy_hourly.keys())
|
||||||
|
if common_end_datetime_utc > max(energy_hourly.keys()):
|
||||||
|
common_end_datetime_utc = max(energy_hourly.keys())
|
||||||
|
|
||||||
|
print("getting common time slot of energy values for all offline meters...")
|
||||||
|
if common_start_datetime_utc is not None and common_start_datetime_utc is not None:
|
||||||
|
if energy_offline_meter_hourly is not None and len(energy_offline_meter_hourly) > 0:
|
||||||
|
for meter_id, energy_hourly in energy_offline_meter_hourly.items():
|
||||||
|
if energy_hourly is None or len(energy_hourly) == 0:
|
||||||
|
common_start_datetime_utc = None
|
||||||
|
common_end_datetime_utc = None
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
if common_start_datetime_utc < min(energy_hourly.keys()):
|
||||||
|
common_start_datetime_utc = min(energy_hourly.keys())
|
||||||
|
if common_end_datetime_utc > max(energy_hourly.keys()):
|
||||||
|
common_end_datetime_utc = max(energy_hourly.keys())
|
||||||
|
|
||||||
|
print("evaluating the equation with SymPy...")
|
||||||
|
normalized_values = list()
|
||||||
|
|
||||||
|
############################################################################################################
|
||||||
|
# Converting Strings to SymPy Expressions
|
||||||
|
# The sympify function(that’s sympify, not to be confused with simplify) can be used to
|
||||||
|
# convert strings into SymPy expressions.
|
||||||
|
############################################################################################################
|
||||||
|
try:
|
||||||
|
expr = sympify(virtual_meter['equation'].lower())
|
||||||
|
print("the expression to be evaluated: " + str(expr))
|
||||||
|
current_datetime_utc = common_start_datetime_utc
|
||||||
|
print("common_start_datetime_utc: " + str(common_start_datetime_utc))
|
||||||
|
print("common_end_datetime_utc: " + str(common_end_datetime_utc))
|
||||||
|
while common_start_datetime_utc is not None \
|
||||||
|
and common_end_datetime_utc is not None \
|
||||||
|
and current_datetime_utc <= common_end_datetime_utc:
|
||||||
|
meta_data = dict()
|
||||||
|
meta_data['start_datetime_utc'] = current_datetime_utc
|
||||||
|
|
||||||
|
####################################################################################################
|
||||||
|
# create a dictionary of Symbol: point pairs
|
||||||
|
####################################################################################################
|
||||||
|
|
||||||
|
subs = dict()
|
||||||
|
|
||||||
|
####################################################################################################
|
||||||
|
# Evaluating the expression at current_datetime_utc
|
||||||
|
####################################################################################################
|
||||||
|
|
||||||
|
if meter_list_in_expression is not None and len(meter_list_in_expression) > 0:
|
||||||
|
for meter_in_expression in meter_list_in_expression:
|
||||||
|
meter_id = str(meter_in_expression['meter_id'])
|
||||||
|
actual_value = energy_meter_hourly[meter_id].get(current_datetime_utc, 0.0)
|
||||||
|
subs[meter_in_expression['variable_name']] = actual_value
|
||||||
|
|
||||||
|
if virtual_meter_list_in_expression is not None and len(virtual_meter_list_in_expression) > 0:
|
||||||
|
for virtual_meter_in_expression in virtual_meter_list_in_expression:
|
||||||
|
virtual_meter_id = str(virtual_meter_in_expression['virtual_meter_id'])
|
||||||
|
actual_value = energy_virtual_meter_hourly[virtual_meter_id].get(current_datetime_utc, 0.0)
|
||||||
|
subs[virtual_meter_in_expression['variable_name']] = actual_value
|
||||||
|
|
||||||
|
if offline_meter_list_in_expression is not None and len(offline_meter_list_in_expression) > 0:
|
||||||
|
for offline_meter_in_expression in offline_meter_list_in_expression:
|
||||||
|
offline_meter_id = str(offline_meter_in_expression['offline_meter_id'])
|
||||||
|
actual_value = energy_offline_meter_hourly[offline_meter_id].get(current_datetime_utc, 0.0)
|
||||||
|
subs[offline_meter_in_expression['variable_name']] = actual_value
|
||||||
|
|
||||||
|
####################################################################################################
|
||||||
|
# To numerically evaluate an expression with a Symbol at a point,
|
||||||
|
# we might use subs followed by evalf,
|
||||||
|
# but it is more efficient and numerically stable to pass the substitution to evalf
|
||||||
|
# using the subs flag, which takes a dictionary of Symbol: point pairs.
|
||||||
|
####################################################################################################
|
||||||
|
|
||||||
|
meta_data['actual_value'] = expr.evalf(subs=subs)
|
||||||
|
|
||||||
|
normalized_values.append(meta_data)
|
||||||
|
|
||||||
|
current_datetime_utc += timedelta(minutes=config.minutes_to_count)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 4.1 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
print("saving energy values to table virtual meter hourly...")
|
||||||
|
|
||||||
|
if len(normalized_values) > 0:
|
||||||
|
try:
|
||||||
|
add_values = (" INSERT INTO tbl_virtual_meter_hourly "
|
||||||
|
" (virtual_meter_id, start_datetime_utc, actual_value) "
|
||||||
|
" VALUES ")
|
||||||
|
|
||||||
|
for meta_data in normalized_values:
|
||||||
|
add_values += " (" + str(virtual_meter['id']) + ","
|
||||||
|
add_values += "'" + meta_data['start_datetime_utc'].isoformat()[0:19] + "',"
|
||||||
|
add_values += str(meta_data['actual_value']) + "), "
|
||||||
|
print("add_values:" + add_values)
|
||||||
|
# trim ", " at the end of string and then execute
|
||||||
|
cursor_energy_db.execute(add_values[:-2])
|
||||||
|
cnx_energy_db.commit()
|
||||||
|
except Exception as e:
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
return "Error in step 4.2 virtual meter worker " + str(e) + " for '" + virtual_meter['name'] + "'"
|
||||||
|
|
||||||
|
if cursor_energy_db:
|
||||||
|
cursor_energy_db.close()
|
||||||
|
if cnx_energy_db:
|
||||||
|
cnx_energy_db.close()
|
||||||
|
|
||||||
|
return None
|
Loading…
Reference in New Issue