From 779e14d3161b29e825abd4b4be2e50828da189e9 Mon Sep 17 00:00:00 2001 From: Cristian Date: Fri, 20 Dec 2024 11:06:11 -0600 Subject: [PATCH] added BBMD simulator and BBMD read_write --- scripts/bacnet/BBMD.py | 95 ++++++++++++++++++++++ scripts/bacnet/BBMDPypes.ini | 10 +++ scripts/bacnet/ReadWriteBBMD.py | 138 ++++++++++++++++++++++++++++++++ 3 files changed, 243 insertions(+) create mode 100644 scripts/bacnet/BBMD.py create mode 100644 scripts/bacnet/BBMDPypes.ini create mode 100644 scripts/bacnet/ReadWriteBBMD.py diff --git a/scripts/bacnet/BBMD.py b/scripts/bacnet/BBMD.py new file mode 100644 index 0000000000..6e2d1bcfc5 --- /dev/null +++ b/scripts/bacnet/BBMD.py @@ -0,0 +1,95 @@ +#!/usr/bin/env python + +""" +This sample application presents itself as a BBMD sitting on an IP network. +The first parameter is the address of the BBMD itself and the second and +subsequent parameters are the entries to put in its broadcast distribution +table. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.core import run +from bacpypes.comm import Client, bind + +from bacpypes.pdu import Address +from bacpypes.bvllservice import BIPBBMD, AnnexJCodec, UDPMultiplexer + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + + +# +# NullClient +# + +@bacpypes_debugging +class NullClient(Client): + + def __init__(self, cid=None): + if _debug: NullClient._debug("__init__ cid=%r", cid) + Client.__init__(self, cid=cid) + + def confirmation(self, *args, **kwargs): + if _debug: NullClient._debug("confirmation %r %r", args, kwargs) + +# +# __main__ +# + +def main(): + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + + # add an argument for interval + parser.add_argument('localaddr', type=str, + help='local address of the BBMD', + ) + + # add an argument for interval + parser.add_argument('bdtentry', type=str, nargs='*', + help='list of addresses of peers', + ) + + # now parse the arguments + args = parser.parse_args() + + if _debug: _log.debug("initialization") + if _debug: _log.debug(" - args: %r", args) + + local_address = Address(args.localaddr) + if _debug: _log.debug(" - local_address: %r", local_address) + + # create a null client that will accept, but do nothing with upstream + # packets from the BBMD + null_client = NullClient() + if _debug: _log.debug(" - null_client: %r", null_client) + + # create a BBMD, bound to the Annex J server on a UDP multiplexer + bbmd = BIPBBMD(local_address) + annexj = AnnexJCodec() + multiplexer = UDPMultiplexer(local_address) + + # bind the layers together + bind(null_client, bbmd, annexj, multiplexer.annexJ) + + # loop through the rest of the addresses + for bdtentry in args.bdtentry: + if _debug: _log.debug(" - bdtentry: %r", bdtentry) + + bdt_address = Address(bdtentry) + bbmd.add_peer(bdt_address) + + if _debug: _log.debug(" - bbmd: %r", bbmd) + + _log.debug("running") + + run() + + _log.debug("fini") + + +if __name__ == "__main__": + main() diff --git a/scripts/bacnet/BBMDPypes.ini b/scripts/bacnet/BBMDPypes.ini new file mode 100644 index 0000000000..26e82439d5 --- /dev/null +++ b/scripts/bacnet/BBMDPypes.ini @@ -0,0 +1,10 @@ +[BACpypes] +objectName: Betelgeuse +address: 10.42.226.248/24 +objectIdentifier: 708113 +maxApduLengthAccepted: 1024 +segmentationSupported: segmentedBoth +vendorIdentifier: 1318 +foreignbbmd: 10.42.224.91 +foreignttl: 30 + diff --git a/scripts/bacnet/ReadWriteBBMD.py b/scripts/bacnet/ReadWriteBBMD.py new file mode 100644 index 0000000000..700ce219d0 --- /dev/null +++ b/scripts/bacnet/ReadWriteBBMD.py @@ -0,0 +1,138 @@ +#!/usr/bin/env python + +""" +This application presents a 'console' prompt to the user asking for read commands +to read the BDT and FDT from a BBMD. +""" + +from bacpypes.debugging import bacpypes_debugging, ModuleLogger +from bacpypes.consolecmd import ConsoleCmd +from bacpypes.consolelogging import ArgumentParser + +from bacpypes.comm import bind, Client +from bacpypes.core import run, enable_sleeping + +from bacpypes.pdu import Address +from bacpypes.bvll import ( + ReadBroadcastDistributionTable, + ReadBroadcastDistributionTableAck, + ReadForeignDeviceTable, + ReadForeignDeviceTableAck, + WriteBroadcastDistributionTable, + Result, +) +from bacpypes.bvllservice import AnnexJCodec, UDPMultiplexer + +# some debugging +_debug = 0 +_log = ModuleLogger(globals()) + +# globals +this_application = None + + +@bacpypes_debugging +class ReadWriteBBMDConsoleClient(ConsoleCmd, Client): + def do_readbdt(self, args): + """readbdt """ + args = args.split() + if _debug: + ReadWriteBBMDConsoleClient._debug("do_readbdt %r", args) + + # build a request and send it downstream + read_bdt = ReadBroadcastDistributionTable(destination=Address(args[0])) + if _debug: + ReadWriteBBMDConsoleClient._debug(" - read_bdt: %r", read_bdt) + + self.request(read_bdt) + + def do_readfdt(self, args): + """readfdt """ + args = args.split() + if _debug: + ReadWriteBBMDConsoleClient._debug("do_readfdt %r", args) + + # build a request and send it downstream + read_fdt = ReadForeignDeviceTable(destination=Address(args[0])) + if _debug: + ReadWriteBBMDConsoleClient._debug(" - read_fdt: %r", read_fdt) + + self.request(read_fdt) + + def do_writebdt(self, args): + """writebdt ...""" + args = args.split() + if _debug: + ReadWriteBBMDConsoleClient._debug("do_writebdt %r", args) + + # build a list of broadcast distribution table entries which just so + # happen to be BACpypes IPv4 addresses + bdt = [] + for addr in args[1:]: + bdte = Address(addr) + bdt.append(bdte) + + # build a request and send it downstream + write_bdt = WriteBroadcastDistributionTable( + destination=Address(args[0]), bdt=bdt + ) + if _debug: + ReadWriteBBMDConsoleClient._debug(" - write_bdt: %r", write_bdt) + + self.request(write_bdt) + + def confirmation(self, pdu): + """Filter for the acks and errors.""" + if _debug: + ReadWriteBBMDConsoleClient._debug("confirmation %r", pdu) + + if isinstance( + pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result) + ): + pdu.debug_contents() + + +def main(): + global this_application + + # parse the command line arguments + parser = ArgumentParser(description=__doc__) + parser.add_argument( + "local_address", + help="IPv4 address", + ) + args = parser.parse_args() + + if _debug: + _log.debug("initialization") + _log.debug(" - args: %r", args) + + local_address = Address(args.local_address) + if _debug: + _log.debug(" - local_address: %r", local_address) + + # make a console + this_console = ReadWriteBBMDConsoleClient() + if _debug: + _log.debug(" - this_console: %r", this_console) + + # create an Annex J codec, bound to the Annex J server + # of the UDP multiplexer + annexj = AnnexJCodec() + mux = UDPMultiplexer(local_address) + + # bind the layers + bind(this_console, annexj, mux.annexJ) + + # enable sleeping will help with threads + enable_sleeping() + + _log.debug("running") + + run() + + _log.debug("fini") + + +if __name__ == "__main__": + main()