diff --git a/README.md b/README.md index 7a7ba714..fd3002d0 100644 --- a/README.md +++ b/README.md @@ -40,7 +40,7 @@ MyEMS项目由下列组件构成: ### MyEMS BACnet/IP 数据采集服务 (Python) -[安装 myems-bacnet](./myems-bacnet/README.md) +[安装 myems-bacnet](../myems-bacnet/README.md) ### MyEMS Modbus TCP 数据采集服务 (Python) @@ -71,10 +71,8 @@ MyEMS项目由下列组件构成: | :--- | :----: | :----: | :----: | | 开源 | ✔️ | ❌ | | | 价格 | 免费 | 收费 | 标准组件授权费;定制组件开发费; | -| 更换品牌名称与标志LOGO | ✔️ | ✔️ | | +| 更换品牌名称与标志LOGO | ❌ | ✔️ | | | Modbus TCP 协议 | ✔️ | ✔️ | 采集数据 https://modbus.org/ | -| BACnet/IP 协议 | ✔️ | ✔️ | 采集数据 http://www.bacnet.org/ | -| MQTT 协议发布 | ✔️ | ✔️ | 发布最新采集到的数据 https://mqtt.org/ | | 数据点数量 | 无限制 |无限制 | 仅受硬件性能限制 | | 计量表数量 | 无限制 |无限制 | 仅受硬件性能限制 | | 空间数量 | 无限制 |无限制 | 仅受硬件性能限制 | @@ -83,11 +81,11 @@ MyEMS项目由下列组件构成: | 门店数量 | 无限制 |无限制 | 仅受硬件性能限制 | | 车间数量 | 无限制 |无限制 | 仅受硬件性能限制 | | 组合设备数量 | 无限制 |无限制 | 仅受硬件性能限制 | -| Docker容器化部署 | ✔️ | ✔️ | https://www.docker.com/ | -| Kubernetes部署 | ✔️ | ✔️ | https://kubernetes.io/ | +| Docker容器化部署 | ❌️ | ✔️ | https://www.docker.com/ | +| Kubernetes部署 | ❌ | ✔️ | https://kubernetes.io/ | | MySQL | ✔️ | ✔️ | http://mysql.com/ | | MariaDB | ✔️ | ✔️ | https://mariadb.org/ | -| SingleStore | ✔️ | ✔️ | https://www.singlestore.com/ | +| SingleStore | ❌️ | ✔️ | https://www.singlestore.com/ | | AWS 云部署 | ✔️ | ✔️ | https://aws.amazon.com/ | | AZure 云部署 | ✔️ | ✔️ | https://azure.microsoft.com/ | | 阿里云部署 | ✔️ | ✔️ | https://aliyun.com/ | @@ -157,14 +155,16 @@ MyEMS项目由下列组件构成: | REST API | ✔️ | ✔️ | 基于Python开发,提供系统配置、能源报告、Excel导出接口 | | Web UI | ✔️ | ✔️ | 基于React开发,用于能源数据分析 | | Admin UI | ✔️ | ✔️ | 基于Angular开发,用于系统配置管理 | +| BACnet/IP 协议 | ❌ | ✔️ | 采集数据 http://www.bacnet.org/ | | MQTT 协议订阅数据 | ❌ | ✔️ | 采集数据 https://mqtt.org/ | +| MQTT 协议发布 | ❌ | ✔️ | 发布最新采集到的数据 https://mqtt.org/ | | Modbus RTU 协议 | ❌ | ✔️ | 采集数据 https://modbus.org/ | | OPC UA 协议 | ❌ | ✔️ | 采集数据 https://opcfoundation.org/ | | OPC DA 协议 | ❌ | ✔️ | 采集数据 https://opcfoundation.org/ | | Siemens S7 协议 | ❌ | ✔️ | 采集数据 https://siemens.com/ | | IEC 104 协议 | ❌ | ✔️ | 采集数据 IEC 60870-5-104 https://en.wikipedia.org/wiki/IEC_60870-5 | -| Johnson Controls Metasys API | ✔️ | ✔️ | 采集数据 https://www.johnsoncontrols.com/ | -| Honeywell EBI | ✔️ | ✔️ | 采集数据 https://www.honeywell.com/ | +| Johnson Controls Metasys API | ❌ | ✔️ | 采集数据 https://www.johnsoncontrols.com/ | +| Honeywell EBI | ❌ | ✔️ | 采集数据 https://www.honeywell.com/ | | SIEMENS Desigo CC | ❌ | ✔️ | 采集数据 https://siemens.com/ | | QWeather API | ❌ | ✔️ | 采集数据 https://www.qweather.com/ | | FDD 能效故障诊断系统 | ❌ | ✔️ | 需要企业版组件许可或定制开发 | @@ -183,10 +183,10 @@ MyEMS项目由下列组件构成: | LoRa无线数传电台模块(数据采集和远程控制)| ❌ | ✔️ | MyEMS认证LoRa硬件设备 | | 重点用能单位能耗在线监测系统上传省级平台通信协议| ❌ | ✔️ | 需要企业版组件许可或定制开发 | | 第三方系统集成服务 | ❌ | ✔️ | 需要企业版组件许可或定制开发 | -| 线上软件使用培训 | ✔️ | ✔️ | 免费 | +| 线上软件使用培训 | ❌ | ✔️ | | | 线下软件使用培训 | ❌ | ✔️ | | -| 在线社区技术支持 | ✔️ | ✔️ | 免费 | -| 邮件技术支持 | ✔️ | ✔️ | 免费 | +| 在线社区技术支持 | ✔️ | ✔️ | | +| 邮件技术支持 | ❌️ | ✔️ | | | 电话技术支持服务 | ❌ | ✔️ | | | 微信技术支持服务 | ❌ | ✔️ | | | 远程桌面技术支持服务 | ❌ | ✔️ | | diff --git a/README_DE.md b/README_DE.md index 90081ff6..ca708a1f 100644 --- a/README_DE.md +++ b/README_DE.md @@ -41,7 +41,7 @@ Dieses Projekt besteht aus folgenden Komponenten: ### MyEMS BACnet/IP Acquisition Service (Python) -[Installieren myems-bacnet](./myems-bacnet/README.md) +[Installieren myems-bacnet](../myems-bacnet/README.md) ### MyEMS Modbus TCP Acquisition Service (Python) @@ -74,10 +74,8 @@ Dieses Projekt besteht aus folgenden Komponenten: | :--- | :----: | :----: | :----: | | Open Source | ✔️ | ❌ | | | Pricing | Free | Pay for Projects | | -| Change Name and Logo | ✔️ | ✔️ | | +| Change Name and Logo | ️❌ | ✔️ | | | Modbus TCP | ✔️ | ✔️ | | -| BACnet/IP | ✔️ | ✔️ | | -| MQTT Publisher | ✔️ | ✔️ | | | Data Points Number | Unbegrenzt | Unbegrenzt | Nur durch die Hardwareleistung begrenzt | | Meters Number | Unbegrenzt | Unbegrenzt | Nur durch die Hardwareleistung begrenzt | | Spaces Number | Unbegrenzt | Unbegrenzt | Nur durch die Hardwareleistung begrenzt | @@ -86,11 +84,11 @@ Dieses Projekt besteht aus folgenden Komponenten: | Stores Number | Unbegrenzt | Unbegrenzt | Nur durch die Hardwareleistung begrenzt | | Shopfloors Number | Unbegrenzt | Unbegrenzt | Nur durch die Hardwareleistung begrenzt | | Combined Equipments Number | Unbegrenzt | Unbegrenzt | Nur durch die Hardwareleistung begrenzt | -| Docker | ✔️ | ✔️ | | -| Kubernetes | ✔️ | ✔️ | | +| Docker | ❌ | ✔️ | | +| Kubernetes | ❌ | ✔️ | | | MySQL | ✔️ | ✔️ | | | MariaDB | ✔️ | ✔️ | | -| SingleStore | ✔️ | ✔️ | | +| SingleStore | ❌ | ✔️ | | | AWS Cloud | ✔️ | ✔️ | | | AZure Cloud | ✔️ | ✔️ | | | Alibaba Cloud | ✔️ | ✔️ | | @@ -160,14 +158,16 @@ Dieses Projekt besteht aus folgenden Komponenten: | REST API | ✔️ | ✔️ | | | Web UI | ✔️ | ✔️ | | | Admin UI | ✔️ | ✔️ | | +| BACnet/IP | ❌️ | ✔️ | | | MQTT Subscriber | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | +| MQTT Publisher | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | Modbus RTU | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | OPC UA | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | OPC DA | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | Siemens S7 | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | IEC 104 | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | Johnson Controls Metasys API | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | -| Honeywell EBI | ✔️ | ✔️ | | +| Honeywell EBI | ❌️ | ✔️ | | | SIEMENS Desigo CC | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | QWeather API | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | | FDD Rule Engine | ❌ | ✔️ | Erfordert eine Standardkomponentenlizenz | @@ -186,10 +186,10 @@ Dieses Projekt besteht aus folgenden Komponenten: | LoRa Radio Module (Data Acquisition and Remote Control) | ❌| ✔️ | MyEMS-zertifiziertes LoRa-Hardwaregerät | | Protocol for Uploading to Provincial Platform of On-line monitoring system for Key Energy-Consuming Unit | ❌ | ✔️ | | | 3rd Party Systems Integration Service | ❌ | ✔️ | Kundenspezifische Entwicklung | -| Online software training | ✔️ | ✔️ | Kostenlos | +| Online software training | ❌ | ✔️ | | | Face to face software training | ❌ | ✔️ | | -| Online Community Customer Support| ✔️ | ✔️ | Kostenlos | -| Email Customer Support | ✔️ | ✔️ | Kostenlos | +| Online Community Customer Support| ✔️ | ✔️ | | +| Email Customer Support | ❌ | ✔️ | | | Telephone Customer Support | ❌ | ✔️ | | | WeChat Customer Support | ❌ | ✔️ | | | Remote Desktop Customer Support | ❌ | ✔️ | | diff --git a/README_EN.md b/README_EN.md index a906974e..ffcfd262 100644 --- a/README_EN.md +++ b/README_EN.md @@ -41,7 +41,7 @@ This project is compose of following components: ### MyEMS BACnet/IP Acquisition Service (Python) -[Install myems-bacnet](./myems-bacnet/README.md) +[Install myems-bacnet](../myems-bacnet/README.md) ### MyEMS Modbus TCP Acquisition Service (Python) @@ -74,10 +74,8 @@ This project is compose of following components: | :--- | :----: | :----: | :----: | | Open Source | ✔️ | ❌ | | | Pricing | Free | Pay for Projects | | -| Change Name and Logo | ✔️ | ✔️ | | +| Change Name and Logo | ❌️ | ✔️ | | | Modbus TCP | ✔️ | ✔️ | | -| BACnet/IP | ✔️ | ✔️ | | -| MQTT Publisher | ✔️ | ✔️ | | | Data Points Number | Unlimited | Unlimited | Limited only by hardware performance | | Meters Number | Unlimited | Unlimited | Limited only by hardware performance | | Spaces Number | Unlimited | Unlimited | Limited only by hardware performance | @@ -86,11 +84,11 @@ This project is compose of following components: | Stores Number | Unlimited | Unlimited | Limited only by hardware performance | | Shopfloors Number | Unlimited | Unlimited | Limited only by hardware performance | | Combined Equipments Number | Unlimited | Unlimited | Limited only by hardware performance | -| Docker | ✔️ | ✔️ | | -| Kubernetes | ✔️ | ✔️ | | +| Docker | ❌ | ✔️ | | +| Kubernetes | ❌ | ✔️ | | | MySQL | ✔️ | ✔️ | | | MariaDB | ✔️ | ✔️ | | -| SingleStore | ✔️ | ✔️ | | +| SingleStore | ❌ | ✔️ | | | AWS Cloud | ✔️ | ✔️ | | | AZure Cloud | ✔️ | ✔️ | | | Alibaba Cloud | ✔️ | ✔️ | | @@ -160,14 +158,16 @@ This project is compose of following components: | REST API | ✔️ | ✔️ | | | Web UI | ✔️ | ✔️ | | | Admin UI | ✔️ | ✔️ | | +| BACnet/IP | ❌ | ✔️ | | | MQTT Subscriber | ❌ | ✔️ | Requires standard component license | +| MQTT Publisher | ❌️ | ✔️ | Requires standard component license | | Modbus RTU | ❌ | ✔️ | Requires standard component license | | OPC UA | ❌ | ✔️ | Requires standard component license | | OPC DA | ❌ | ✔️ | Requires standard component license | | Siemens S7 | ❌ | ✔️ | Requires standard component license | | IEC 104 | ❌ | ✔️ | Requires standard component license | | Johnson Controls Metasys API | ❌ | ✔️ | Requires standard component license | -| Honeywell EBI | ✔️ | ✔️ | | +| Honeywell EBI | ❌ | ✔️ | | | SIEMENS Desigo CC | ❌ | ✔️ | Requires standard component license | | QWeather API | ❌ | ✔️ | Requires standard component license | | FDD Rule Engine | ❌ | ✔️ | Requires standard component license or custom development | @@ -186,10 +186,10 @@ This project is compose of following components: | LoRa Radio Module (Data Acquisition and Remote Control)| ❌ | ✔️ | MyEMS certified LoRa hardware device | | Protocol for Uploading to Provincial Platform of On-line monitoring system for Key Energy-Consuming Unit | ❌ | ✔️ | Requires standard component license or custom development | | 3rd Party Systems Integration Service | ❌ | ✔️ | Custom development | -| Online software training | ✔️ | ✔️ | Free | +| Online software training | ❌ | ✔️ | | | Face to face software training | ❌ | ✔️ | | -| Online Community Customer Support| ✔️ | ✔️ | Free | -| Email Customer Support | ✔️ | ✔️ | Free | +| Online Community Customer Support| ✔️ | ✔️ | | +| Email Customer Support | ❌ | ✔️ | | | Telephone Customer Support | ❌ | ✔️ | | | WeChat Customer Support | ❌ | ✔️ | | | Remote Desktop Customer Support | ❌ | ✔️ | | diff --git a/myems-bacnet/.gitignore b/myems-bacnet/.gitignore deleted file mode 100644 index f8ea377e..00000000 --- a/myems-bacnet/.gitignore +++ /dev/null @@ -1,248 +0,0 @@ - -# Created by https://www.toptal.com/developers/gitignore/api/python,pycharm -# Edit at https://www.toptal.com/developers/gitignore?templates=python,pycharm - -### PyCharm ### -# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider -# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 - -# User-specific stuff -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf - -# Generated files -.idea/**/contentModel.xml - -# Sensitive or high-churn files -.idea/**/dataSources/ -.idea/**/dataSources.ids -.idea/**/dataSources.local.xml -.idea/**/sqlDataSources.xml -.idea/**/dynamic.xml -.idea/**/uiDesigner.xml -.idea/**/dbnavigator.xml - -# Gradle -.idea/**/gradle.xml -.idea/**/libraries - -# Gradle and Maven with auto-import -# When using Gradle or Maven with auto-import, you should exclude module files, -# since they will be recreated, and may cause churn. Uncomment if using -# auto-import. -# .idea/artifacts -# .idea/compiler.xml -# .idea/jarRepositories.xml -# .idea/modules.xml -# .idea/*.iml -# .idea/modules -# *.iml -# *.ipr - -# CMake -cmake-build-*/ - -# Mongo Explorer plugin -.idea/**/mongoSettings.xml - -# File-based project format -*.iws - -# IntelliJ -out/ - -# mpeltonen/sbt-idea plugin -.idea_modules/ - -# JIRA plugin -atlassian-ide-plugin.xml - -# Cursive Clojure plugin -.idea/replstate.xml - -# Crashlytics plugin (for Android Studio and IntelliJ) -com_crashlytics_export_strings.xml -crashlytics.properties -crashlytics-build.properties -fabric.properties - -# Editor-based Rest Client -.idea/httpRequests - -# Android studio 3.1+ serialized cache file -.idea/caches/build_file_checksums.ser - -### PyCharm Patch ### -# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 - -# *.iml -# modules.xml -# .idea/misc.xml -# *.ipr - -# Sonarlint plugin -# https://plugins.jetbrains.com/plugin/7973-sonarlint -.idea/**/sonarlint/ - -# SonarQube Plugin -# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin -.idea/**/sonarIssues.xml - -# Markdown Navigator plugin -# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced -.idea/**/markdown-navigator.xml -.idea/**/markdown-navigator-enh.xml -.idea/**/markdown-navigator/ - -# Cache file creation bug -# See https://youtrack.jetbrains.com/issue/JBR-2257 -.idea/$CACHE_FILE$ - -# CodeStream plugin -# https://plugins.jetbrains.com/plugin/12206-codestream -.idea/codestream.xml - -### Python ### -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -pytestdebug.log - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ -doc/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ -pythonenv* - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# profiling data -.prof - -# End of https://www.toptal.com/developers/gitignore/api/python,pycharm \ No newline at end of file diff --git a/myems-bacnet/LICENSE b/myems-bacnet/LICENSE deleted file mode 100644 index b91c1ac4..00000000 --- a/myems-bacnet/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2021 MyEMS - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/myems-bacnet/README.md b/myems-bacnet/README.md deleted file mode 100644 index e2d29c9f..00000000 --- a/myems-bacnet/README.md +++ /dev/null @@ -1,108 +0,0 @@ -# MyEMS BACnet Service - - -## Introduction - -This service is a component of MyEMS to acquire data from BACnet devices - -## Prerequisites -bacpypes - -mysql.connector - -Schedule - -## Installation - -Download and install MySQL Connector: -``` - $ cd ~/tools - $ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz - $ tar xzf mysql-connector-python-8.0.20.tar.gz - $ cd ~/tools/mysql-connector-python-8.0.20 - $ sudo python3 setup.py install -``` - -Download and install Schedule -``` - $ cd ~/tools - $ git clone https://github.com/dbader/schedule.git - $ cd ~/tools/schedule - $ sudo python3 setup.py install -``` - -Download and install bacpypes library -``` - $ cd ~/tools - $ git clone https://github.com/pypa/setuptools_scm.git - $ git clone https://github.com/pytest-dev/pytest-runner.git - $ git clone https://github.com/JoelBender/bacpypes.git - $ cd ~/tools/setuptools_scm/ - $ sudo python3 setup.py install - $ cd ~/tools/pytest-runner/ - $ sudo python3 setup.py install - $ cd ~/tools/bacpypes - $ sudo python3 setup.py install - $ sudo ufw allow 47808 -``` - -Install myems-bacnet service -``` - $ cd ~ - $ git clone https://github.com/MyEMS/myems.git - $ cd myems - $ git checkout master (or the latest release tag) - $ sudo cp -R ~/myems/myems-bacnet /myems-bacnet -``` -Edit the config -``` - $ sudo nano /myems-bacnet/config.py -``` -Setup systemd service: -```bash - $ sudo cp myems-bacnet.service /lib/systemd/system/ -``` -Enable the service: -```bash - $ sudo systemctl enable myems-bacnet.service -``` -Start the service: -```bash - $ sudo systemctl start myems-bacnet.service -``` -Monitor the service: -```bash - $ sudo systemctl status myems-bacnet.service -``` -View the log: -```bash - $ cat /myems-bacnet.log -``` - -### Add Data Sources and Points in MyEMS Admin - -Data source protocol: -``` -bacnet-ip -``` - -Data source connection example: -``` -{"host": "192.168.0.3", "port": 47808} -``` - -Point address example: -``` -{"object_id":3002786,"object_type":"analogValue","property_array_index":null,"property_name":"presentValue"} -``` - - -## References - -[1]. http://myems.io - -[2]. http://bacnet.org - -[3]. https://github.com/JoelBender/bacpypes - - diff --git a/myems-bacnet/acquisition.py b/myems-bacnet/acquisition.py deleted file mode 100644 index 123a1ac4..00000000 --- a/myems-bacnet/acquisition.py +++ /dev/null @@ -1,529 +0,0 @@ -import json -import mysql.connector -import config -import time -from datetime import datetime -import math -from decimal import Decimal -from bacpypes.core import run, stop, deferred -from bacpypes.local.device import LocalDeviceObject -from bacpypes.pdu import Address, GlobalBroadcast - -from myems_application import MyEMSApplication - - -######################################################################################################################## -# Acquisition Procedures -# Step 1: Get data source list -# Step 2: Get point list -# Step 3: Read point values from BACnet -# Step 4: Bulk insert point values and update latest values in historical database -######################################################################################################################## - -def process(logger, ): - print("Creating a device object...") - try: - # make a device object - this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], - objectIdentifier=config.bacnet_device['object_identifier'], - maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], - segmentationSupported=config.bacnet_device['segmentation_supported'], - vendorIdentifier=config.bacnet_device['vendor_identifier']) - - except Exception as e: - logger.error("Failed to create BACnet device object: " + str(e)) - # ignore - pass - - print("Connecting to myems_system_db ...") - cnx_system_db = None - cursor_system_db = None - - while not cnx_system_db or not cnx_system_db.is_connected(): - try: - cnx_system_db = mysql.connector.connect(**config.myems_system_db) - cursor_system_db = cnx_system_db.cursor() - except Exception as e: - logger.error("Error to connect to myems_system_db in acquisition process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - print("Failed to connect to myems_system_db, sleep a minute and retry...") - time.sleep(60) - continue - - print("Connecting to myems_historical_db...") - cnx_historical_db = None - cursor_historical_db = None - - while not cnx_historical_db or not cnx_historical_db.is_connected(): - try: - cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) - cursor_historical_db = cnx_historical_db.cursor() - except Exception as e: - logger.error("Error to connect to myems_historical_db in acquisition process " + str(e)) - if cursor_historical_db: - cursor_historical_db.close() - if cnx_historical_db: - cnx_historical_db.close() - print("Failed to connect to myems_historical_db, sleep a minute and retry...") - time.sleep(60) - continue - - while True: - # the outermost while loop - ################################################################################################################ - # Step 1: Get data source list - ################################################################################################################ - # check the connection to the database - if not cnx_system_db or not cnx_system_db.is_connected(): - try: - cnx_system_db = mysql.connector.connect(**config.myems_system_db) - cursor_system_db = cnx_system_db.cursor() - except Exception as e: - logger.error("Error in step 1.1 of acquisition process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - # sleep and retry - time.sleep(60) - continue - - # Get data sources by gateway and protocol - try: - query = (" SELECT ds.id, ds.name, ds.connection " - " FROM tbl_data_sources ds, tbl_gateways g " - " WHERE ds.protocol = 'bacnet-ip' AND ds.gateway_id = g.id AND g.id = %s AND g.token = %s " - " ORDER BY ds.id ") - cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'],)) - rows_data_source = cursor_system_db.fetchall() - except Exception as e: - logger.error("Error in step 1.2 of acquisition process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - # sleep and retry - time.sleep(60) - continue - - if rows_data_source is None or len(rows_data_source) == 0: - logger.error("Step 1.2: Data Source Not Found, Wait for minutes to retry.") - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - # sleep and retry - time.sleep(60) - continue - - point_dict = dict() - bacnet_point_list = list() - for row_data_source in rows_data_source: - # print("Data Source: ID=%s, Name=%s, Connection=%s ", - # row_data_source[0], row_data_source[1], row_data_source[2]) - - if row_data_source[2] is None or len(row_data_source[2]) == 0: - logger.error("Step 1.2: Data Source Connection Not Found. ID=%s, Name=%s, Connection=%s ", - row_data_source[0], row_data_source[1], row_data_source[2]) - # go to next row_data_source in for loop - continue - try: - server = json.loads(row_data_source[2]) - except Exception as e: - logger.error("Error in step 1.3 of acquisition process: \n" - "Invalid Data Source Connection in JSON " + str(e)) - # go to next row_data_source in for loop - continue - - if 'host' not in server.keys() or server['host'] is None or len(server['host']) == 0: - logger.error("Step 1.4: Data Source Connection Invalid. ID=%s, Name=%s, Connection=%s ", - row_data_source[0], row_data_source[1], row_data_source[2]) - # go to next row_data_source in for loop - continue - - data_source_id = row_data_source[0] - data_source_name = row_data_source[1] - data_source_host = server['host'] - - ############################################################################################################ - # Step 2: Get point list - ############################################################################################################ - try: - query = (" SELECT id, name, object_type, is_trend, ratio, address " - " FROM tbl_points " - " WHERE data_source_id = %s AND is_virtual = FALSE " - " ORDER BY id ") - cursor_system_db.execute(query, (data_source_id,)) - rows_points = cursor_system_db.fetchall() - except Exception as e: - logger.error("Error in step 2.1 of acquisition process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - # go to next row_data_source in for loop - continue - - if rows_points is None or len(rows_points) == 0: - # there is no points for this data source - logger.error("Error in step 2.1 of acquisition process: \n" - "Point Not Found in Data Source (ID = %s, Name = %s) ", data_source_id, data_source_name) - # go to next row_data_source in for loop - continue - - # There are points for this data source - for row_point in rows_points: - # print("Point in DataSource(ID=%s): ID=%s, Name=%s, ObjectType=%s, IsTrend = %s, Address=%s ", - # data_source_id, row_point[0], row_point[1], row_point[2], row_point[3], row_point[4]) - point_id = row_point[0] - point_name = row_point[1] - point_object_type = row_point[2] - is_trend = row_point[3] - ratio = row_point[4] - address = json.loads(row_point[5]) - # address example - # {"object_type":"analogValue", "object_id":3004860, "property_name":"presentValue", - # "property_array_index":null} - if 'object_type' not in address.keys() \ - or address['object_type'] is None \ - or len(address['object_type']) == 0 \ - or 'object_id' not in address.keys() \ - or address['object_id'] is None \ - or address['object_id'] < 0 \ - or 'property_name' not in address.keys() \ - or address['property_name'] is None \ - or len(address['property_name']) == 0 \ - or 'property_array_index' not in address.keys(): - logger.error("Point Address Invalid. ID=%s, Name=%s, Address=%s ", - row_point[0], row_point[1], row_point[5]) - # go to next row_point in for loop - continue - - bacnet_object_type = address['object_type'] - bacnet_object_id = address['object_id'] - bacnet_property_name = address['property_name'] - bacnet_property_array_index = address['property_array_index'] - - point_dict[point_id] = {'data_source_id': data_source_id, - 'point_name': point_name, - 'object_type': point_object_type, - 'is_trend': is_trend, - 'ratio': ratio, - } - bacnet_point_list.append((point_id, - data_source_host, - bacnet_object_type, - bacnet_object_id, - bacnet_property_name, - bacnet_property_array_index)) - # end of for row_point - # end of for row_data_source - - ################################################################################################################ - # Step 3: Read point values from BACnet - ################################################################################################################ - if point_dict is None or len(point_dict) == 0: - logger.error("Point Not Found in Step 2") - continue - - if bacnet_point_list is None or len(bacnet_point_list) == 0: - logger.error("BACnet Point Not Found in Step 2") - continue - - if not isinstance(this_device, LocalDeviceObject): - try: - # Create a device object - this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], - objectIdentifier=config.bacnet_device['object_identifier'], - maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], - segmentationSupported=config.bacnet_device['segmentation_supported'], - vendorIdentifier=config.bacnet_device['vendor_identifier']) - except Exception as e: - logger.error("Step 3.1 Create BACnet device " + str(e)) - continue - - try: - # make a BIPForeignApplication - this_application = MyEMSApplication(bacnet_point_list, - this_device, - config.bacnet_device['local_address'], - Address(config.bacnet_device['foreignBBMD']), - int(config.bacnet_device['foreignTTL'])) - - # fire off a request when the core has a chance - deferred(this_application.next_request) - - run() - - energy_value_list = list() - analog_value_list = list() - digital_value_list = list() - last_seen_data_source_set = set() - - # dump out the results - for request, response in zip(bacnet_point_list, this_application.response_values): - # print("request=%s, response=%s ", request, response,) - point_id = request[0] - data_source_id = point_dict[point_id]['data_source_id'] - object_type = point_dict[point_id]['object_type'] - is_trend = point_dict[point_id]['is_trend'] - ratio = point_dict[point_id]['ratio'] - if not isinstance(response, int) and \ - not isinstance(response, float) and \ - not isinstance(response, bool) and \ - not isinstance(response, str): - # go to next response - logger.error("response data type %s, value=%s invalid: request=%s", - type(response), response, request) - continue - else: - value = response - - if object_type == 'ANALOG_VALUE': - if math.isnan(value): - logger.error("response data type is Not A Number: request=%s", request) - continue - - analog_value_list.append({'data_source_id': data_source_id, - 'point_id': point_id, - 'is_trend': is_trend, - 'value': Decimal(value) * ratio}) - - elif object_type == 'ENERGY_VALUE': - if math.isnan(value): - logger.error("response data type is Not A Number: request=%s", request) - continue - - energy_value_list.append({'data_source_id': data_source_id, - 'point_id': point_id, - 'is_trend': is_trend, - 'value': Decimal(value) * ratio}) - elif object_type == 'DIGITAL_VALUE': - if isinstance(value, str): - if value == 'active': - value = 1 - elif value == 'inactive': - value = 0 - - digital_value_list.append({'data_source_id': data_source_id, - 'point_id': point_id, - 'is_trend': is_trend, - 'value': int(value) * int(ratio)}) - - # add data_source_id to the last seen set - last_seen_data_source_set.add(data_source_id) - except Exception as e: - logger.error("Step 3.2 ReadPointList " + str(e)) - time.sleep(60) - continue - finally: - this_application.close_socket() - del this_application - - ################################################################################################################ - # Step 4: Bulk insert point values and update latest values in historical database - ################################################################################################################ - # check the connection to the database - if not cnx_historical_db or not cnx_historical_db.is_connected(): - try: - cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) - cursor_historical_db = cnx_historical_db.cursor() - except Exception as e: - logger.error("Error in step 4.1 of acquisition process " + str(e)) - - if cnx_historical_db: - cnx_historical_db.close() - if cursor_historical_db: - cursor_historical_db.close() - # sleep and continue outer while loop to reconnect to server - time.sleep(60) - continue - - current_datetime_utc = datetime.utcnow() - # bulk insert values into historical database within a period - # update latest values in the meanwhile - if len(analog_value_list) > 0: - add_values = (" INSERT INTO tbl_analog_value (point_id, utc_date_time, actual_value) " - " VALUES ") - trend_value_count = 0 - - for point_value in analog_value_list: - if point_value['is_trend']: - add_values += " (" + str(point_value['point_id']) + "," - add_values += "'" + current_datetime_utc.isoformat() + "'," - add_values += str(point_value['value']) + "), " - trend_value_count += 1 - - if trend_value_count > 0: - try: - # trim ", " at the end of string and then execute - cursor_historical_db.execute(add_values[:-2]) - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.2.1 of acquisition process " + str(e)) - # ignore this exception - pass - - # update tbl_analog_value_latest - delete_values = " DELETE FROM tbl_analog_value_latest WHERE point_id IN ( " - latest_values = (" INSERT INTO tbl_analog_value_latest (point_id, utc_date_time, actual_value) " - " VALUES ") - latest_value_count = 0 - - for point_value in analog_value_list: - delete_values += str(point_value['point_id']) + "," - latest_values += " (" + str(point_value['point_id']) + "," - latest_values += "'" + current_datetime_utc.isoformat() + "'," - latest_values += str(point_value['value']) + "), " - latest_value_count += 1 - - if latest_value_count > 0: - try: - # replace "," at the end of string with ")" - cursor_historical_db.execute(delete_values[:-1] + ")") - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.2.2 of acquisition process " + str(e)) - # ignore this exception - pass - - try: - # trim ", " at the end of string and then execute - cursor_historical_db.execute(latest_values[:-2]) - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.2.3 of acquisition process " + str(e)) - # ignore this exception - pass - - if len(energy_value_list) > 0: - add_values = (" INSERT INTO tbl_energy_value (point_id, utc_date_time, actual_value) " - " VALUES ") - trend_value_count = 0 - - for point_value in energy_value_list: - if point_value['is_trend']: - add_values += " (" + str(point_value['point_id']) + "," - add_values += "'" + current_datetime_utc.isoformat() + "'," - add_values += str(point_value['value']) + "), " - trend_value_count += 1 - - if trend_value_count > 0: - try: - # trim ", " at the end of string and then execute - cursor_historical_db.execute(add_values[:-2]) - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.3.1 of acquisition process: " + str(e)) - # ignore this exception - pass - - # update tbl_energy_value_latest - delete_values = " DELETE FROM tbl_energy_value_latest WHERE point_id IN ( " - latest_values = (" INSERT INTO tbl_energy_value_latest (point_id, utc_date_time, actual_value) " - " VALUES ") - - latest_value_count = 0 - for point_value in energy_value_list: - delete_values += str(point_value['point_id']) + "," - latest_values += " (" + str(point_value['point_id']) + "," - latest_values += "'" + current_datetime_utc.isoformat() + "'," - latest_values += str(point_value['value']) + "), " - latest_value_count += 1 - - if latest_value_count > 0: - try: - # replace "," at the end of string with ")" - cursor_historical_db.execute(delete_values[:-1] + ")") - cnx_historical_db.commit() - - except Exception as e: - logger.error("Error in step 4.3.2 of acquisition process " + str(e)) - # ignore this exception - pass - - try: - # trim ", " at the end of string and then execute - cursor_historical_db.execute(latest_values[:-2]) - cnx_historical_db.commit() - - except Exception as e: - logger.error("Error in step 4.3.3 of acquisition process " + str(e)) - # ignore this exception - pass - - if len(digital_value_list) > 0: - add_values = (" INSERT INTO tbl_digital_value (point_id, utc_date_time, actual_value) " - " VALUES ") - trend_value_count = 0 - - for point_value in digital_value_list: - if point_value['is_trend']: - add_values += " (" + str(point_value['point_id']) + "," - add_values += "'" + current_datetime_utc.isoformat() + "'," - add_values += str(point_value['value']) + "), " - trend_value_count += 1 - - if trend_value_count > 0: - try: - # trim ", " at the end of string and then execute - cursor_historical_db.execute(add_values[:-2]) - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.4.1 of acquisition process: " + str(e)) - # ignore this exception - pass - - # update tbl_digital_value_latest - delete_values = " DELETE FROM tbl_digital_value_latest WHERE point_id IN ( " - latest_values = (" INSERT INTO tbl_digital_value_latest (point_id, utc_date_time, actual_value) " - " VALUES ") - latest_value_count = 0 - for point_value in digital_value_list: - delete_values += str(point_value['point_id']) + "," - latest_values += " (" + str(point_value['point_id']) + "," - latest_values += "'" + current_datetime_utc.isoformat() + "'," - latest_values += str(point_value['value']) + "), " - latest_value_count += 1 - - if latest_value_count > 0: - try: - # replace "," at the end of string with ")" - cursor_historical_db.execute(delete_values[:-1] + ")") - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.4.2 of acquisition process " + str(e)) - # ignore this exception - pass - - try: - # trim ", " at the end of string and then execute - cursor_historical_db.execute(latest_values[:-2]) - cnx_historical_db.commit() - except Exception as e: - logger.error("Error in step 4.4.3 of acquisition process " + str(e)) - # ignore this exception - pass - - if len(last_seen_data_source_set) > 0: - update_row = (" UPDATE tbl_data_sources " - " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " - " WHERE id IN (") - - for data_source_id in last_seen_data_source_set: - update_row += str(data_source_id) + "," - - try: - cursor_system_db.execute(update_row[:-1] + ")",) - cnx_system_db.commit() - except Exception as e: - logger.error("Error in step 4.4.4 of acquisition process " + str(e)) - # ignore this exception - pass - - # sleep some seconds - time.sleep(config.interval_in_seconds) - # end of the outermost while loop diff --git a/myems-bacnet/config.py b/myems-bacnet/config.py deleted file mode 100644 index 89357c9e..00000000 --- a/myems-bacnet/config.py +++ /dev/null @@ -1,37 +0,0 @@ -myems_system_db = { - 'user': 'root', - 'password': '!MyEMS1', - 'host': '127.0.0.1', - 'database': 'myems_system_db', - 'port': 3306, -} - -myems_historical_db = { - 'user': 'root', - 'password': '!MyEMS1', - 'host': '127.0.0.1', - 'database': 'myems_historical_db', - 'port': 3306, -} - -# Indicates how long the process waits between readings -interval_in_seconds = 180 - -bacnet_device = { - 'local_address': '192.168.1.10', - 'object_name': 'MYEMS', - 'object_identifier': 0xABCD, - 'max_apdu_length_accepted': 1024, - 'segmentation_supported': 'segmentedBoth', - 'vendor_identifier': 0xABCD, - 'foreignPort': 47808, - 'foreignBBMD': '192.168.0.1', - 'foreignTTL': 30, -} - -# Get the gateway ID and token from MyEMS Admin -# This is used for getting data sources associated with the gateway -gateway = { - 'id': 1, - 'token': 'AAAAAAAA-AAAA-AAAA-AAAA-AAAAAAAAAAAA' -} diff --git a/myems-bacnet/gateway.py b/myems-bacnet/gateway.py deleted file mode 100644 index 225039ac..00000000 --- a/myems-bacnet/gateway.py +++ /dev/null @@ -1,85 +0,0 @@ -import mysql.connector -import config -import time -from datetime import datetime -import schedule - -######################################################################################################################## -# Gateway Job Procedures -# Step 1: Verify Gateway Token -# Step 2: Collect Gateway Information -# Step 3: Update Gateway Information -######################################################################################################################## - - -def job(logger, ): - ################################################################################################################ - # Step 1: Verify Gateway Token - ################################################################################################################ - cnx_system_db = None - cursor_system_db = None - try: - cnx_system_db = mysql.connector.connect(**config.myems_system_db) - cursor_system_db = cnx_system_db.cursor() - except Exception as e: - logger.error("Error in step 1.1 of Gateway process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - return - - # TODO: choose a more secure method to verify gateway token - try: - query = (" SELECT name " - " FROM tbl_gateways " - " WHERE id = %s AND token = %s ") - cursor_system_db.execute(query, (config.gateway['id'], config.gateway['token'])) - row = cursor_system_db.fetchone() - except Exception as e: - logger.error("Error in step 1.2 of gateway process: " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - return - - if row is None: - logger.error("Error in step 1.3 of gateway process: Not Found ") - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - return - - ############################################################################################################ - # Step 2: Collect Gateway Information - ############################################################################################################ - # todo: get more information, such as CPU/MEMORY/DISK - current_datetime_utc = datetime.utcnow() - - ############################################################################################################ - # Step 3: Update Gateway Information - ############################################################################################################ - update_row = (" UPDATE tbl_gateways " - " SET last_seen_datetime_utc = '" + current_datetime_utc.isoformat() + "' " - " WHERE id = %s ") - try: - cursor_system_db.execute(update_row, (config.gateway['id'], )) - cnx_system_db.commit() - except Exception as e: - logger.error("Error in step 3.1 of gateway process " + str(e)) - return - finally: - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - - -def process(logger, ): - schedule.every(config.interval_in_seconds).seconds.do(job, logger,) - - while True: - schedule.run_pending() - time.sleep(60) diff --git a/myems-bacnet/main.py b/myems-bacnet/main.py deleted file mode 100644 index 555b87b5..00000000 --- a/myems-bacnet/main.py +++ /dev/null @@ -1,36 +0,0 @@ -import logging -from logging.handlers import RotatingFileHandler -from multiprocessing import Process -import acquisition -import gateway - - -def main(): - """main""" - # create logger - logger = logging.getLogger('myems-bacnet') - # specifies the lowest-severity log message a logger will handle, - # where debug is the lowest built-in severity level and critical is the highest built-in severity. - # For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL - # messages and will ignore DEBUG messages. - logger.setLevel(logging.ERROR) - # create file handler which logs messages - fh = RotatingFileHandler('myems-bacnet.log', maxBytes=1024*1024, backupCount=1) - # create formatter and add it to the handlers - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - fh.setFormatter(formatter) - # add the handlers to logger - logger.addHandler(fh) - - #################################################################################################################### - # Create Acquisition Process - #################################################################################################################### - Process(target=acquisition.process, args=(logger,)).start() - #################################################################################################################### - # Create Gateway Process - #################################################################################################################### - Process(target=gateway.process, args=(logger,)).start() - - -if __name__ == "__main__": - main() diff --git a/myems-bacnet/myems-bacnet.service b/myems-bacnet/myems-bacnet.service deleted file mode 100644 index 837921ea..00000000 --- a/myems-bacnet/myems-bacnet.service +++ /dev/null @@ -1,15 +0,0 @@ -[Unit] -Description=myems-bacnet daemon -After=network.target - -[Service] -User=root -Group=root -ExecStart=/usr/bin/python3 /myems-bacnet/main.py -ExecReload=/bin/kill -s HUP $MAINPID -ExecStop=/bin/kill -s TERM $MAINPID -PrivateTmp=true -Restart=always - -[Install] -WantedBy=multi-user.target diff --git a/myems-bacnet/myems_application.py b/myems-bacnet/myems_application.py deleted file mode 100644 index 4e0dd17c..00000000 --- a/myems-bacnet/myems_application.py +++ /dev/null @@ -1,79 +0,0 @@ -from collections import deque - -from bacpypes.app import BIPForeignApplication -from bacpypes.core import stop, deferred -from bacpypes.iocb import IOCB - -from bacpypes.pdu import Address -from bacpypes.object import get_datatype - -from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK -from bacpypes.primitivedata import Unsigned -from bacpypes.constructeddata import Array - - -class MyEMSApplication(BIPForeignApplication): - - def __init__(self, point_list, *args): - - BIPForeignApplication.__init__(self, *args) - - # turn the point list into a queue - self.point_queue = deque(point_list) - - # make a list of the response values - self.response_values = [] - - def next_request(self): - - # check to see if we're done - if not self.point_queue: - stop() - return - - # get the next request - point_id, addr, obj_type, obj_inst, prop_id, idx = self.point_queue.popleft() - - # build a request - request = ReadPropertyRequest( - objectIdentifier=(obj_type, obj_inst), - propertyIdentifier=prop_id, - propertyArrayIndex=idx - ) - request.pduDestination = Address(addr) - - # make an IOCB - iocb = IOCB(request) - - # set a callback for the response - iocb.add_callback(self.complete_request) - - # send the request - self.request_io(iocb) - - def complete_request(self, iocb): - if iocb.ioResponse: - apdu = iocb.ioResponse - - # find the datatype - datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier) - if not datatype: - raise TypeError("unknown datatype") - - # special case for array parts, others are managed by cast_out - if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None): - if apdu.propertyArrayIndex == 0: - value = apdu.propertyValue.cast_out(Unsigned) - else: - value = apdu.propertyValue.cast_out(datatype.subtype) - else: - value = apdu.propertyValue.cast_out(datatype) - - # save the value - self.response_values.append(value) - - if iocb.ioError: - self.response_values.append(iocb.ioError) - - # fire off another request - deferred(self.next_request) diff --git a/myems-bacnet/test.py b/myems-bacnet/test.py deleted file mode 100644 index 161b52e3..00000000 --- a/myems-bacnet/test.py +++ /dev/null @@ -1,49 +0,0 @@ -from bacpypes.core import run, stop, deferred -from bacpypes.local.device import LocalDeviceObject -from bacpypes.pdu import Address, GlobalBroadcast -from myems_application import MyEMSApplication -import config - - -######################################################################################################################## -# this procedure tests BACnet/IP environment -######################################################################################################################## -def main(): - - # make a device object - this_device = LocalDeviceObject(objectName=config.bacnet_device['object_name'], - objectIdentifier=config.bacnet_device['object_identifier'], - maxApduLengthAccepted=config.bacnet_device['max_apdu_length_accepted'], - segmentationSupported=config.bacnet_device['segmentation_supported'], - vendorIdentifier=config.bacnet_device['vendor_identifier'], ) - - # point list, set according to your device - point_list = [ - # point_id, addr, obj_type, obj_inst, prop_id, idx - (1, '10.117.73.53', 'analogInput', 1, 'presentValue', None), - (2, '10.117.73.53', 'analogInput', 2, 'presentValue', None), - (3, '10.117.73.53', 'analogInput', 3, 'presentValue', None), - (4, '10.117.73.53', 'analogInput', 4, 'presentValue', None), - (5, '10.117.73.53', 'analogInput', 5, 'presentValue', None), - (6, '10.117.73.53', 'analogInput', 6, 'presentValue', None), - ] - - # make a simple application - this_application = MyEMSApplication(point_list, - this_device, - config.bacnet_device['local_address'], - Address(config.bacnet_device['foreignBBMD']), - int(config.bacnet_device['foreignTTL'])) - - # fire off a request when the core has a chance - deferred(this_application.next_request) - - run() - - # dump out the results - for request, response in zip(point_list, this_application.response_values): - print(request, response) - - -if __name__ == "__main__": - main() diff --git a/myems-mqtt-publisher/.gitignore b/myems-mqtt-publisher/.gitignore deleted file mode 100644 index f8ea377e..00000000 --- a/myems-mqtt-publisher/.gitignore +++ /dev/null @@ -1,248 +0,0 @@ - -# Created by https://www.toptal.com/developers/gitignore/api/python,pycharm -# Edit at https://www.toptal.com/developers/gitignore?templates=python,pycharm - -### PyCharm ### -# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider -# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 - -# User-specific stuff -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf - -# Generated files -.idea/**/contentModel.xml - -# Sensitive or high-churn files -.idea/**/dataSources/ -.idea/**/dataSources.ids -.idea/**/dataSources.local.xml -.idea/**/sqlDataSources.xml -.idea/**/dynamic.xml -.idea/**/uiDesigner.xml -.idea/**/dbnavigator.xml - -# Gradle -.idea/**/gradle.xml -.idea/**/libraries - -# Gradle and Maven with auto-import -# When using Gradle or Maven with auto-import, you should exclude module files, -# since they will be recreated, and may cause churn. Uncomment if using -# auto-import. -# .idea/artifacts -# .idea/compiler.xml -# .idea/jarRepositories.xml -# .idea/modules.xml -# .idea/*.iml -# .idea/modules -# *.iml -# *.ipr - -# CMake -cmake-build-*/ - -# Mongo Explorer plugin -.idea/**/mongoSettings.xml - -# File-based project format -*.iws - -# IntelliJ -out/ - -# mpeltonen/sbt-idea plugin -.idea_modules/ - -# JIRA plugin -atlassian-ide-plugin.xml - -# Cursive Clojure plugin -.idea/replstate.xml - -# Crashlytics plugin (for Android Studio and IntelliJ) -com_crashlytics_export_strings.xml -crashlytics.properties -crashlytics-build.properties -fabric.properties - -# Editor-based Rest Client -.idea/httpRequests - -# Android studio 3.1+ serialized cache file -.idea/caches/build_file_checksums.ser - -### PyCharm Patch ### -# Comment Reason: https://github.com/joeblau/gitignore.io/issues/186#issuecomment-215987721 - -# *.iml -# modules.xml -# .idea/misc.xml -# *.ipr - -# Sonarlint plugin -# https://plugins.jetbrains.com/plugin/7973-sonarlint -.idea/**/sonarlint/ - -# SonarQube Plugin -# https://plugins.jetbrains.com/plugin/7238-sonarqube-community-plugin -.idea/**/sonarIssues.xml - -# Markdown Navigator plugin -# https://plugins.jetbrains.com/plugin/7896-markdown-navigator-enhanced -.idea/**/markdown-navigator.xml -.idea/**/markdown-navigator-enh.xml -.idea/**/markdown-navigator/ - -# Cache file creation bug -# See https://youtrack.jetbrains.com/issue/JBR-2257 -.idea/$CACHE_FILE$ - -# CodeStream plugin -# https://plugins.jetbrains.com/plugin/12206-codestream -.idea/codestream.xml - -### Python ### -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - -# C extensions -*.so - -# Distribution / packaging -.Python -build/ -develop-eggs/ -dist/ -downloads/ -eggs/ -.eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -wheels/ -pip-wheel-metadata/ -share/python-wheels/ -*.egg-info/ -.installed.cfg -*.egg -MANIFEST - -# PyInstaller -# Usually these files are written by a python script from a template -# before PyInstaller builds the exe, so as to inject date/other infos into it. -*.manifest -*.spec - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -htmlcov/ -.tox/ -.nox/ -.coverage -.coverage.* -.cache -nosetests.xml -coverage.xml -*.cover -*.py,cover -.hypothesis/ -.pytest_cache/ -pytestdebug.log - -# Translations -*.mo -*.pot - -# Django stuff: -*.log -local_settings.py -db.sqlite3 -db.sqlite3-journal - -# Flask stuff: -instance/ -.webassets-cache - -# Scrapy stuff: -.scrapy - -# Sphinx documentation -docs/_build/ -doc/_build/ - -# PyBuilder -target/ - -# Jupyter Notebook -.ipynb_checkpoints - -# IPython -profile_default/ -ipython_config.py - -# pyenv -.python-version - -# pipenv -# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control. -# However, in case of collaboration, if having platform-specific dependencies or dependencies -# having no cross-platform support, pipenv may install dependencies that don't work, or not -# install all needed dependencies. -#Pipfile.lock - -# PEP 582; used by e.g. github.com/David-OConnor/pyflow -__pypackages__/ - -# Celery stuff -celerybeat-schedule -celerybeat.pid - -# SageMath parsed files -*.sage.py - -# Environments -.env -.venv -env/ -venv/ -ENV/ -env.bak/ -venv.bak/ -pythonenv* - -# Spyder project settings -.spyderproject -.spyproject - -# Rope project settings -.ropeproject - -# mkdocs documentation -/site - -# mypy -.mypy_cache/ -.dmypy.json -dmypy.json - -# Pyre type checker -.pyre/ - -# pytype static type analyzer -.pytype/ - -# profiling data -.prof - -# End of https://www.toptal.com/developers/gitignore/api/python,pycharm \ No newline at end of file diff --git a/myems-mqtt-publisher/LICENSE b/myems-mqtt-publisher/LICENSE deleted file mode 100644 index b91c1ac4..00000000 --- a/myems-mqtt-publisher/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2021 MyEMS - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/myems-mqtt-publisher/README.md b/myems-mqtt-publisher/README.md deleted file mode 100644 index 2e0609d4..00000000 --- a/myems-mqtt-publisher/README.md +++ /dev/null @@ -1,103 +0,0 @@ -## MyEMS MQTT Publisher Service - -### Introduction -This service is a component of MyEMS to publish data to MQTT broker. - -### Prerequisites -simplejson - -paho-mqtt - -mysql.connector - -### Installation - -Download and Install simplejson -``` - $ cd ~/tools - $ git clone https://github.com/simplejson/simplejson.git - $ cd simplejson - $ sudo python3 setup.py install -``` - -Download and install MySQL Connector: -``` - $ cd ~/tools - $ wget https://dev.mysql.com/get/Downloads/Connector-Python/mysql-connector-python-8.0.20.tar.gz - $ tar xzf mysql-connector-python-8.0.20.tar.gz - $ cd ~/tools/mysql-connector-python-8.0.20 - $ sudo python3 setup.py install -``` - -Download and install paho-mqtt: -``` - $ cd ~/tools - $ git clone https://github.com/eclipse/paho.mqtt.python.git - $ cd ~/tools/paho.mqtt.python - $ sudo python3 setup.py install -``` - -Install myems-mqtt-publisher service -``` - $ cd ~ - $ git clone https://github.com/MyEMS/myems.git - $ cd myems - $ sudo git checkout master (or the latest release tag) - $ sudo cp -R ~/myems/myems-mqtt-publisher /myems-mqtt-publisher -``` -Edit the config -``` - $ sudo nano /myems-mqtt-publisher/config.py -``` -Setup systemd service: -```bash - $ sudo cp myems--mqtt-publisher.service /lib/systemd/system/ -``` -Enable the service: -```bash - $ sudo systemctl enable myems--mqtt-publisher.service -``` -Start the service: -```bash - $ sudo systemctl start myems--mqtt-publisher.service -``` -Monitor the service: -```bash - $ sudo systemctl status myems--mqtt-publisher.service -``` -View the log: -```bash - $ cat /myems--mqtt-publisher.log -``` - -### Topic -topic_prefix in config and point_id - -Example: -``` -'myems/point/3' -``` - -### Payload -data_source_id, the Data Source ID. - -point_id, the Point ID. - -object_type, the type of data, is one of 'ANALOG_VALUE'(decimal(18, 3)) , 'ENERGY_VALUE'(decimal(18, 3)) and 'DIGITAL_VALUE'(int(11)). - -utc_date_time, the date time in utc when data was acquired. The full format looks like 'YYYY-MM-DDTHH:MM:SS'. - -value, the data value in Decimal or Integer. - -Example: -``` -{"data_source_id": 1, "point_id": 3, "object_type": 'ANALOG_VALUE', "utc_date_time": "2020-09-28T03:23:06", "value": Decimal('591960276.000')} -``` - -### References - [1]. http://myems.io - - [2]. https://www.eclipse.org/paho/clients/python/ - - [3]. https://simplejson.readthedocs.io/ - diff --git a/myems-mqtt-publisher/acquisition.py b/myems-mqtt-publisher/acquisition.py deleted file mode 100644 index adcc257b..00000000 --- a/myems-mqtt-publisher/acquisition.py +++ /dev/null @@ -1,240 +0,0 @@ -import json -import mysql.connector -import time -import simplejson as json -import paho.mqtt.client as mqtt -import config - - -# indicates the connectivity with the MQTT broker -mqtt_connected_flag = False - - -# the on_connect callback function for MQTT client -# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ -def on_mqtt_connect(client, userdata, flags, rc): - global mqtt_connected_flag - if rc == 0: - mqtt_connected_flag = True # set flag - print("MQTT connected OK") - else: - print("Bad MQTT connection Returned code=", rc) - mqtt_connected_flag = False - - -# the on_disconnect callback function for MQTT client -# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ -def on_mqtt_disconnect(client, userdata, rc=0): - global mqtt_connected_flag - - print("DisConnected, result code "+str(rc)) - mqtt_connected_flag = False - - -######################################################################################################################## -# Acquisition Procedures -# Step 1: Get point list -# Step 2: Connect to the historical database -# Step 3: Connect to the MQTT broker -# Step 4: Read point values from latest tables in historical database -# Step 5: Publish point values -######################################################################################################################## -def process(logger, object_type): - - while True: - # the outermost while loop - - ################################################################################################################ - # Step 1: Get point list - ################################################################################################################ - cnx_system_db = None - cursor_system_db = None - try: - cnx_system_db = mysql.connector.connect(**config.myems_system_db) - cursor_system_db = cnx_system_db.cursor() - except Exception as e: - logger.error("Error in step 1.1 of acquisition process " + str(e)) - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - # sleep and then continue the outermost loop to reload points - time.sleep(60) - continue - - try: - if object_type == 'ANALOG_VALUE': - query = (" SELECT id, name, data_source_id " - " FROM tbl_points" - " WHERE object_type = 'ANALOG_VALUE' " - " ORDER BY id ") - elif object_type == 'DIGITAL_VALUE': - query = (" SELECT id, name, data_source_id " - " FROM tbl_points" - " WHERE object_type = 'DIGITAL_VALUE' " - " ORDER BY id ") - elif object_type == 'ENERGY_VALUE': - query = (" SELECT id, name, data_source_id " - " FROM tbl_points" - " WHERE object_type = 'ENERGY_VALUE' " - " ORDER BY id ") - - cursor_system_db.execute(query, ) - rows_point = cursor_system_db.fetchall() - except Exception as e: - logger.error("Error in step 1.2 of acquisition process: " + str(e)) - # sleep several minutes and continue the outer loop to reload points - time.sleep(60) - continue - finally: - if cursor_system_db: - cursor_system_db.close() - if cnx_system_db: - cnx_system_db.close() - - if rows_point is None or len(rows_point) == 0: - # there is no points - logger.error("Point Not Found, acquisition process terminated ") - # sleep 60 seconds and go back to the begin of outermost while loop to reload points - time.sleep(60) - continue - - point_dict = dict() - for row_point in rows_point: - point_dict[row_point[0]] = {'name': row_point[1], 'data_source_id': row_point[2]} - - ################################################################################################################ - # Step 2: Connect to the historical database - ################################################################################################################ - cnx_historical_db = None - cursor_historical_db = None - try: - cnx_historical_db = mysql.connector.connect(**config.myems_historical_db) - cursor_historical_db = cnx_historical_db.cursor() - except Exception as e: - logger.error("Error in step 2.1 of acquisition process " + str(e)) - if cursor_historical_db: - cursor_historical_db.close() - if cnx_historical_db: - cnx_historical_db.close() - # sleep 60 seconds and go back to the begin of outermost while loop to reload points - time.sleep(60) - continue - - ################################################################################################################ - # Step 3: Connect to the MQTT broker - ################################################################################################################ - mqc = None - try: - mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time())) - mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password']) - mqc.on_connect = on_mqtt_connect - mqc.on_disconnect = on_mqtt_disconnect - mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60) - # The loop_start() starts a new thread, that calls the loop method at regular intervals for you. - # It also handles re-connects automatically. - mqc.loop_start() - - except Exception as e: - logger.error("Error in step 3.1 of acquisition process " + str(e)) - # sleep 60 seconds and go back to the begin of outermost while loop to reload points - time.sleep(60) - continue - - ################################################################################################################ - # Step 4: Read point values from latest tables in historical database - ################################################################################################################ - # inner loop to read all point latest values and publish them within a period - while True: - if object_type == 'ANALOG_VALUE': - query = " SELECT point_id, utc_date_time, actual_value" \ - " FROM tbl_analog_value_latest WHERE point_id IN ( " - elif object_type == 'DIGITAL_VALUE': - query = " SELECT point_id, utc_date_time, actual_value" \ - " FROM tbl_digital_value_latest WHERE point_id IN ( " - elif object_type == 'ENERGY_VALUE': - query = " SELECT point_id, utc_date_time, actual_value" \ - " FROM tbl_energy_value_latest WHERE point_id IN ( " - - for point_id in point_dict: - query += str(point_id) + "," - - try: - # replace "," at the end of string with ")" - cursor_historical_db.execute(query[:-1] + ")") - rows_point_values = cursor_historical_db.fetchall() - except Exception as e: - logger.error("Error in step 4.1 of acquisition process " + str(e)) - if cursor_historical_db: - cursor_historical_db.close() - if cnx_historical_db: - cnx_historical_db.close() - - # destroy mqtt client - if mqc and mqc.is_connected(): - mqc.disconnect() - del mqc - # break the inner while loop - break - - if rows_point_values is None or len(rows_point_values) == 0: - # there is no points - print(" Point value Not Found") - - # sleep 60 seconds and go back to the begin of inner while loop - time.sleep(60) - continue - - point_value_list = list() - for row_point_value in rows_point_values: - point_id = row_point_value[0] - point = point_dict.get(point_id) - data_source_id = point['data_source_id'] - utc_date_time = row_point_value[1].replace(tzinfo=None).isoformat(timespec='seconds') - value = row_point_value[2] - point_value_list.append({'data_source_id': data_source_id, - 'point_id': point_id, - 'object_type': object_type, - 'utc_date_time': utc_date_time, - 'value': value}) - - ############################################################################################################ - # Step 5: Publish point values - ############################################################################################################ - - if len(point_value_list) > 0 and mqtt_connected_flag: - for point_value in point_value_list: - try: - # publish real time value to mqtt broker - topic = config.topic_prefix + str(point_value['point_id']) - print('topic=' + topic) - payload = json.dumps({'data_source_id': point_value['data_source_id'], - 'point_id': point_value['point_id'], - 'object_type': point_value['object_type'], - 'utc_date_time': point_value['utc_date_time'], - 'value': point_value['value']}) - print('payload=' + str(payload)) - info = mqc.publish(topic=topic, - payload=payload, - qos=config.qos, - retain=True) - except Exception as e: - logger.error("Error in step 5 of acquisition process: " + str(e)) - if cursor_historical_db: - cursor_historical_db.close() - if cnx_historical_db: - cnx_historical_db.close() - - # destroy mqtt client - if mqc and mqc.is_connected(): - mqc.disconnect() - del mqc - - # break the inner while loop - break - - # sleep some seconds - time.sleep(config.interval_in_seconds) - # end of inner while loop - - # end of outermost while loop diff --git a/myems-mqtt-publisher/config.py b/myems-mqtt-publisher/config.py deleted file mode 100644 index 260a918e..00000000 --- a/myems-mqtt-publisher/config.py +++ /dev/null @@ -1,31 +0,0 @@ -myems_system_db = { - 'user': 'root', - 'password': '!MyEMS1', - 'host': '127.0.0.1', - 'database': 'myems_system_db', - 'port': 3306, -} - -myems_historical_db = { - 'user': 'root', - 'password': '!MyEMS1', - 'host': '127.0.0.1', - 'database': 'myems_historical_db', - 'port': 3306, -} - -myems_mqtt_broker = { - 'host': '127.0.0.1', - 'port': 1883, - 'username': 'admin', - 'password': 'Password1', -} - -# The quality of service level to use. -# The value is one of 0, 1 or 2, -qos = 0 - -# The topic prefix that the message should be published on. -topic_prefix = 'myems/point/' - -interval_in_seconds = 60 diff --git a/myems-mqtt-publisher/main.py b/myems-mqtt-publisher/main.py deleted file mode 100644 index a31d6767..00000000 --- a/myems-mqtt-publisher/main.py +++ /dev/null @@ -1,33 +0,0 @@ -from multiprocessing import Process -import logging -from logging.handlers import RotatingFileHandler -import acquisition - - -def main(): - """main""" - # create logger - logger = logging.getLogger('myems-mqtt-publisher') - # specifies the lowest-severity log message a logger will handle, - # where debug is the lowest built-in severity level and critical is the highest built-in severity. - # For example, if the severity level is INFO, the logger will handle only INFO, WARNING, ERROR, and CRITICAL - # messages and will ignore DEBUG messages. - logger.setLevel(logging.ERROR) - # create file handler which logs messages - fh = RotatingFileHandler('myems-mqtt-publisher.log', maxBytes=1024*1024, backupCount=1) - # create formatter and add it to the handlers - formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') - fh.setFormatter(formatter) - # add the handlers to logger - logger.addHandler(fh) - - # create acquisition processes - Process(target=acquisition.process, args=(logger, 'ANALOG_VALUE')).start() - - Process(target=acquisition.process, args=(logger, 'DIGITAL_VALUE')).start() - - Process(target=acquisition.process, args=(logger, 'ENERGY_VALUE')).start() - - -if __name__ == "__main__": - main() diff --git a/myems-mqtt-publisher/myems-mqtt-publisher.service b/myems-mqtt-publisher/myems-mqtt-publisher.service deleted file mode 100644 index b62f5be2..00000000 --- a/myems-mqtt-publisher/myems-mqtt-publisher.service +++ /dev/null @@ -1,15 +0,0 @@ -[Unit] -Description=myems-mqtt-publisher daemon -After=network.target - -[Service] -User=root -Group=root -ExecStart=/usr/bin/python3 /myems-mqtt-publisher/main.py -ExecReload=/bin/kill -s HUP $MAINPID -ExecStop=/bin/kill -s TERM $MAINPID -PrivateTmp=true -Restart=always - -[Install] -WantedBy=multi-user.target diff --git a/myems-mqtt-publisher/test.py b/myems-mqtt-publisher/test.py deleted file mode 100644 index 9dd128de..00000000 --- a/myems-mqtt-publisher/test.py +++ /dev/null @@ -1,81 +0,0 @@ -import simplejson as json -import config -import time -from datetime import datetime -import paho.mqtt.client as mqtt -import random -import decimal - - -# global flag indicates the connectivity with the MQTT broker -mqtt_connected_flag = False - - -# the on_connect callback function for MQTT client -# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ -def on_mqtt_connect(client, userdata, flags, rc): - global mqtt_connected_flag - if rc == 0: - mqtt_connected_flag = True # set flag - print("MQTT connected OK") - else: - print("Bad MQTT connection Returned code=", rc) - mqtt_connected_flag = False - - -# the on_disconnect callback function for MQTT client -# refer to http://www.steves-internet-guide.com/client-connections-python-mqtt/ -def on_mqtt_disconnect(client, userdata, rc=0): - global mqtt_connected_flag - - print("DisConnected, result code "+str(rc)) - mqtt_connected_flag = False - - -######################################################################################################################## -# Test Procedures -# Step 1: Connect the MQTT broker -# Step 2: Publish test topic messages -# Step 3: Run 'mosquitto_sub -h 192.168.0.1 -v -t myems/point/# -u admin -P Password1' to receive test messages -######################################################################################################################## - -def main(): - global mqtt_connected_flag - mqc = None - try: - mqc = mqtt.Client(client_id='MYEMS' + "-" + str(time.time())) - mqc.username_pw_set(config.myems_mqtt_broker['username'], config.myems_mqtt_broker['password']) - mqc.on_connect = on_mqtt_connect - mqc.on_disconnect = on_mqtt_disconnect - mqc.connect_async(config.myems_mqtt_broker['host'], config.myems_mqtt_broker['port'], 60) - # The loop_start() starts a new thread, that calls the loop method at regular intervals for you. - # It also handles re-connects automatically. - mqc.loop_start() - - except Exception as e: - print("MQTT Client Connection error " + str(e)) - while True: - if mqtt_connected_flag: - try: - # publish real time value to mqtt broker - payload = json.dumps({"data_source_id": 1, - "point_id": 3, - "utc_date_time": datetime.utcnow().isoformat(timespec='seconds'), - "value": decimal.Decimal(random.randrange(0, 10000))}) - print('payload=' + str(payload)) - info = mqc.publish('myems/point/' + str(3), - payload=payload, - qos=0, - retain=True) - except Exception as e: - print("MQTT Publish Error : " + str(e)) - # ignore this exception, does not stop the procedure - pass - time.sleep(1) - else: - print('MQTT Client Connection error') - time.sleep(1) - - -if __name__ == "__main__": - main()