Skip to content

Commit

Permalink
added BBMD simulator and BBMD read_write
Browse files Browse the repository at this point in the history
  • Loading branch information
cmromo committed Dec 20, 2024
1 parent 1ac2e91 commit 779e14d
Show file tree
Hide file tree
Showing 3 changed files with 243 additions and 0 deletions.
95 changes: 95 additions & 0 deletions scripts/bacnet/BBMD.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
#!/usr/bin/env python

"""
This sample application presents itself as a BBMD sitting on an IP network.
The first parameter is the address of the BBMD itself and the second and
subsequent parameters are the entries to put in its broadcast distribution
table.
"""

from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolelogging import ArgumentParser

from bacpypes.core import run
from bacpypes.comm import Client, bind

from bacpypes.pdu import Address
from bacpypes.bvllservice import BIPBBMD, AnnexJCodec, UDPMultiplexer

# some debugging
_debug = 0
_log = ModuleLogger(globals())


#
# NullClient
#

@bacpypes_debugging
class NullClient(Client):

def __init__(self, cid=None):
if _debug: NullClient._debug("__init__ cid=%r", cid)
Client.__init__(self, cid=cid)

def confirmation(self, *args, **kwargs):
if _debug: NullClient._debug("confirmation %r %r", args, kwargs)

#
# __main__
#

def main():
# parse the command line arguments
parser = ArgumentParser(description=__doc__)

# add an argument for interval
parser.add_argument('localaddr', type=str,
help='local address of the BBMD',
)

# add an argument for interval
parser.add_argument('bdtentry', type=str, nargs='*',
help='list of addresses of peers',
)

# now parse the arguments
args = parser.parse_args()

if _debug: _log.debug("initialization")
if _debug: _log.debug(" - args: %r", args)

local_address = Address(args.localaddr)
if _debug: _log.debug(" - local_address: %r", local_address)

# create a null client that will accept, but do nothing with upstream
# packets from the BBMD
null_client = NullClient()
if _debug: _log.debug(" - null_client: %r", null_client)

# create a BBMD, bound to the Annex J server on a UDP multiplexer
bbmd = BIPBBMD(local_address)
annexj = AnnexJCodec()
multiplexer = UDPMultiplexer(local_address)

# bind the layers together
bind(null_client, bbmd, annexj, multiplexer.annexJ)

# loop through the rest of the addresses
for bdtentry in args.bdtentry:
if _debug: _log.debug(" - bdtentry: %r", bdtentry)

bdt_address = Address(bdtentry)
bbmd.add_peer(bdt_address)

if _debug: _log.debug(" - bbmd: %r", bbmd)

_log.debug("running")

run()

_log.debug("fini")


if __name__ == "__main__":
main()
10 changes: 10 additions & 0 deletions scripts/bacnet/BBMDPypes.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[BACpypes]
objectName: Betelgeuse
address: 10.42.226.248/24
objectIdentifier: 708113
maxApduLengthAccepted: 1024
segmentationSupported: segmentedBoth
vendorIdentifier: 1318
foreignbbmd: 10.42.224.91
foreignttl: 30

138 changes: 138 additions & 0 deletions scripts/bacnet/ReadWriteBBMD.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
#!/usr/bin/env python

"""
This application presents a 'console' prompt to the user asking for read commands
to read the BDT and FDT from a BBMD.
"""

from bacpypes.debugging import bacpypes_debugging, ModuleLogger
from bacpypes.consolecmd import ConsoleCmd
from bacpypes.consolelogging import ArgumentParser

from bacpypes.comm import bind, Client
from bacpypes.core import run, enable_sleeping

from bacpypes.pdu import Address
from bacpypes.bvll import (
ReadBroadcastDistributionTable,
ReadBroadcastDistributionTableAck,
ReadForeignDeviceTable,
ReadForeignDeviceTableAck,
WriteBroadcastDistributionTable,
Result,
)
from bacpypes.bvllservice import AnnexJCodec, UDPMultiplexer

# some debugging
_debug = 0
_log = ModuleLogger(globals())

# globals
this_application = None


@bacpypes_debugging
class ReadWriteBBMDConsoleClient(ConsoleCmd, Client):
def do_readbdt(self, args):
"""readbdt <addr>"""
args = args.split()
if _debug:
ReadWriteBBMDConsoleClient._debug("do_readbdt %r", args)

# build a request and send it downstream
read_bdt = ReadBroadcastDistributionTable(destination=Address(args[0]))
if _debug:
ReadWriteBBMDConsoleClient._debug(" - read_bdt: %r", read_bdt)

self.request(read_bdt)

def do_readfdt(self, args):
"""readfdt <addr>"""
args = args.split()
if _debug:
ReadWriteBBMDConsoleClient._debug("do_readfdt %r", args)

# build a request and send it downstream
read_fdt = ReadForeignDeviceTable(destination=Address(args[0]))
if _debug:
ReadWriteBBMDConsoleClient._debug(" - read_fdt: %r", read_fdt)

self.request(read_fdt)

def do_writebdt(self, args):
"""writebdt <addr> <entry> ..."""
args = args.split()
if _debug:
ReadWriteBBMDConsoleClient._debug("do_writebdt %r", args)

# build a list of broadcast distribution table entries which just so
# happen to be BACpypes IPv4 addresses
bdt = []
for addr in args[1:]:
bdte = Address(addr)
bdt.append(bdte)

# build a request and send it downstream
write_bdt = WriteBroadcastDistributionTable(
destination=Address(args[0]), bdt=bdt
)
if _debug:
ReadWriteBBMDConsoleClient._debug(" - write_bdt: %r", write_bdt)

self.request(write_bdt)

def confirmation(self, pdu):
"""Filter for the acks and errors."""
if _debug:
ReadWriteBBMDConsoleClient._debug("confirmation %r", pdu)

if isinstance(
pdu, (ReadBroadcastDistributionTableAck, ReadForeignDeviceTableAck, Result)
):
pdu.debug_contents()


def main():
global this_application

# parse the command line arguments
parser = ArgumentParser(description=__doc__)
parser.add_argument(
"local_address",
help="IPv4 address",
)
args = parser.parse_args()

if _debug:
_log.debug("initialization")
_log.debug(" - args: %r", args)

local_address = Address(args.local_address)
if _debug:
_log.debug(" - local_address: %r", local_address)

# make a console
this_console = ReadWriteBBMDConsoleClient()
if _debug:
_log.debug(" - this_console: %r", this_console)

# create an Annex J codec, bound to the Annex J server
# of the UDP multiplexer
annexj = AnnexJCodec()
mux = UDPMultiplexer(local_address)

# bind the layers
bind(this_console, annexj, mux.annexJ)

# enable sleeping will help with threads
enable_sleeping()

_log.debug("running")

run()

_log.debug("fini")


if __name__ == "__main__":
main()

0 comments on commit 779e14d

Please sign in to comment.