myems/myems-bacnet/myems_application.py

80 lines
2.3 KiB
Python

from collections import deque
from bacpypes.app import BIPForeignApplication
from bacpypes.core import stop, deferred
from bacpypes.iocb import IOCB
from bacpypes.pdu import Address
from bacpypes.object import get_datatype
from bacpypes.apdu import ReadPropertyRequest, Error, AbortPDU, ReadPropertyACK
from bacpypes.primitivedata import Unsigned
from bacpypes.constructeddata import Array
class MyEMSApplication(BIPForeignApplication):
def __init__(self, point_list, *args):
BIPForeignApplication.__init__(self, *args)
# turn the point list into a queue
self.point_queue = deque(point_list)
# make a list of the response values
self.response_values = []
def next_request(self):
# check to see if we're done
if not self.point_queue:
stop()
return
# get the next request
point_id, addr, obj_type, obj_inst, prop_id, idx = self.point_queue.popleft()
# build a request
request = ReadPropertyRequest(
objectIdentifier=(obj_type, obj_inst),
propertyIdentifier=prop_id,
propertyArrayIndex=idx
)
request.pduDestination = Address(addr)
# make an IOCB
iocb = IOCB(request)
# set a callback for the response
iocb.add_callback(self.complete_request)
# send the request
self.request_io(iocb)
def complete_request(self, iocb):
if iocb.ioResponse:
apdu = iocb.ioResponse
# find the datatype
datatype = get_datatype(apdu.objectIdentifier[0], apdu.propertyIdentifier)
if not datatype:
raise TypeError("unknown datatype")
# special case for array parts, others are managed by cast_out
if issubclass(datatype, Array) and (apdu.propertyArrayIndex is not None):
if apdu.propertyArrayIndex == 0:
value = apdu.propertyValue.cast_out(Unsigned)
else:
value = apdu.propertyValue.cast_out(datatype.subtype)
else:
value = apdu.propertyValue.cast_out(datatype)
# save the value
self.response_values.append(value)
if iocb.ioError:
self.response_values.append(iocb.ioError)
# fire off another request
deferred(self.next_request)