diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 7406111d8..d252562e5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -6,3 +6,10 @@ repos: rev: 'v19.1.3' hooks: - id: clang-format + +- repo: https://github.com/astral-sh/ruff-pre-commit + rev: v0.8.1 + hooks: + - id: ruff-format + - id: ruff + args: [--fix] diff --git a/.travis.yml b/.travis.yml deleted file mode 100644 index e5b6292c4..000000000 --- a/.travis.yml +++ /dev/null @@ -1,34 +0,0 @@ -os: windows - -language: cpp - -branches: - only: - - master - - /^release\/.*$/ - -notifications: - email: - recipients: - - zeek-commits-internal@zeek.org - -before_install: - - df -h - - choco list --localonly - - cmake --version - - cmake --help - -install: - - choco install -y --no-progress openssl - - choco install -y --no-progress visualstudio2019buildtools --package-parameters "--add Microsoft.VisualStudio.Workload.VCTools --add Microsoft.VisualStudio.Component.VC.Tools.x86.x64 --add Microsoft.VisualStudio.Component.Windows10SDK.18362" - -script: - - mkdir build && cd build - # Travis environment has 2-cores - - cmake -A x64 -DOPENSSL_ROOT_DIR="C:\Program Files\OpenSSL-Win64" -DEXTRA_FLAGS="-MP2" .. - - cmake --build . --target install --config release - - ctest -C release - -after_failure: - - cat CMakeFiles/CMakeOutput.log - - cat CMakeFiles/CMakeError.log diff --git a/CHANGES b/CHANGES index a191676ff..0dba865b1 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,21 @@ +2.8.0-dev.163 | 2024-12-10 17:27:21 -0700 + + * Fix ruff F findings (Tim Wojtulewicz, Corelight) + + * Fix ruff UP findings (Tim Wojtulewicz, Corelight) + + * Fix ruff C4 findings (Tim Wojtulewicz, Corelight) + + * Add ruff linting for python, fix I/ISC issues (Tim Wojtulewicz, Corelight) + + * Add pre-commit run ruff-format, fix findings (Tim Wojtulewicz, Corelight) + + * Remove long-obsolete travis.yml file (Tim Wojtulewicz, Corelight) + + * CI: Install python3.9-dev on ubuntu 20 (Tim Wojtulewicz, Corelight) + + * Require Python 3.9 in CMakeLists.txt (Tim Wojtulewicz, Corelight) + 2.8.0-dev.154 | 2024-12-06 15:14:02 -0800 * Fix potential connector infinite loop in read_result::stop scenarios (Christian Kreibich, Corelight) diff --git a/CMakeLists.txt b/CMakeLists.txt index 5e3ab2e7c..5d22b146b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -335,7 +335,7 @@ endif () if (NOT DISABLE_PYTHON_BINDINGS) set(Python_ADDITIONAL_VERSIONS 3) - find_package(Python 3.5 COMPONENTS Interpreter Development) + find_package(Python 3.9 COMPONENTS Interpreter Development) if (NOT Python_FOUND) message(STATUS "Skipping Python bindings: Python interpreter not found") diff --git a/VERSION b/VERSION index 3bc4a226d..3ed256061 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -2.8.0-dev.154 +2.8.0-dev.163 diff --git a/bindings/python/broker/__init__.py b/bindings/python/broker/__init__.py index 5e6046bf2..c90cc24f5 100644 --- a/bindings/python/broker/__init__.py +++ b/bindings/python/broker/__init__.py @@ -1,23 +1,20 @@ - try: from . import _broker except ImportError: import _broker -import sys +import collections import datetime -import time -import types import ipaddress -import collections +import types try: from datetime import timezone + utc = timezone.utc except: # Only Python 3.2+ has a datetime.timezone.utc we can re-use class UTC(datetime.tzinfo): - def utcoffset(self, dt): return datetime.timedelta(0) @@ -49,7 +46,7 @@ def __ne__(self, other): utc = UTC() Version = _broker.Version -Version.string = lambda: '%u.%u.%u' % (Version.MAJOR, Version.MINOR, Version.PATCH) +Version.string = lambda: f"{Version.MAJOR}.{Version.MINOR}.{Version.PATCH}" now = _broker.now @@ -72,12 +69,15 @@ def __ne__(self, other): # Broker's (or better: CAF's) EC code is an integer. Add support # for comparision against the enum. _EC_eq = _broker.EC.__eq__ + + def _our_EC_eq(self, other): if isinstance(other, int): return other == int(self) else: return _EC_eq(self, other) + _broker.EC.__eq__ = _our_EC_eq Address = _broker.Address @@ -91,8 +91,10 @@ def _our_EC_eq(self, other): Timestamp = _broker.Timestamp Vector = _broker.Vector + def _make_topic(t): - return (Topic(t) if not isinstance(t, Topic) else t) + return Topic(t) if not isinstance(t, Topic) else t + def _make_topics(ts): if isinstance(ts, Topic): @@ -106,6 +108,7 @@ def _make_topics(ts): return _broker.VectorTopic(ts) + # This class does not derive from the internal class because we # need to pass in existing instances. That means we need to # wrap all methods, even those that just reuse the internal @@ -161,6 +164,7 @@ def add_topic(self, topic, block=False): def remove_topic(self, topic, block=False): return self._subscriber.remove_topic(_make_topic(topic), block) + class SafeSubscriber(Subscriber): """Subscriber subclass that makes returnes messages safe to process. @@ -191,7 +195,8 @@ def get(self, *args, **kwargs): assert False -class StatusSubscriber(): + +class StatusSubscriber: def __init__(self, internal_subscriber): self._subscriber = internal_subscriber @@ -235,6 +240,7 @@ def _to_error_or_status(self, x): assert False + class Publisher: # This class does not derive from the internal class because we # need to pass in existing instances. That means we need to @@ -280,6 +286,7 @@ def publish_batch(self, *batch): batch = [Data.from_py(d) for d in batch] return self._publisher.publish_batch(_broker.Vector(batch)) + class Store: # This class does not derive from the internal class because we # need to pass in existing instances. That means we need to @@ -299,14 +306,6 @@ def __exit__(self, type, value, traceback): self._parent = None self._store = None - def __enter__(self): - return self - - def __exit__(self, type, value, traceback): - self._store.reset() - self._parent = None - self._store = None - def name(self): return self._store.name() @@ -370,7 +369,7 @@ def append(self, key, s, expiry=None): key = Data.from_py(key) s = Data.from_py(s) expiry = self._to_expiry(expiry) - return self._store.append(key, s, expiry) + return self._store.append(key, s, expiry) def insert_into(self, key, index, value=None, expiry=None): key = Data.from_py(key) @@ -387,7 +386,7 @@ def remove_from(self, key, index, expiry=None): key = Data.from_py(key) index = Data.from_py(index) expiry = self._to_expiry(expiry) - return self._store.remove_from(key, index, expiry) + return self._store.remove_from(key, index, expiry) def push(self, key, value, expiry=None): key = Data.from_py(key) @@ -401,7 +400,11 @@ def pop(self, key, expiry=None): return self._store.pop(key, expiry) def _to_expiry(self, e): - return (_broker.OptionalTimespan(_broker.Timespan(float(e))) if e is not None else _broker.OptionalTimespan()) + return ( + _broker.OptionalTimespan(_broker.Timespan(float(e))) + if e is not None + else _broker.OptionalTimespan() + ) def await_idle(self, timeout=None): if timeout: @@ -413,18 +416,21 @@ def await_idle(self, timeout=None): # before destroying the endpoint. _parent = None + class Endpoint(_broker.Endpoint): - def make_subscriber(self, topics, qsize = 20, subscriber_class=Subscriber): + def make_subscriber(self, topics, qsize=20, subscriber_class=Subscriber): topics = _make_topics(topics) s = _broker.Endpoint.make_subscriber(self, topics, qsize) return subscriber_class(s) - def make_safe_subscriber(self, topics, qsize = 20): + def make_safe_subscriber(self, topics, qsize=20): """A variant of make_subscriber that returns a SafeSubscriber instance. In contrast to the Subscriber class, messages retrieved from SafeSubscribers use immutable, hashable values to ensure Python can represent them. When in doubt, use make_safe_subscriber().""" - return self.make_subscriber(topics=topics, qsize=qsize, subscriber_class=SafeSubscriber) + return self.make_subscriber( + topics=topics, qsize=qsize, subscriber_class=SafeSubscriber + ) def make_status_subscriber(self, receive_statuses=False): s = _broker.Endpoint.make_status_subscriber(self, receive_statuses) @@ -441,7 +447,7 @@ def forward(self, topics): def publish(self, topic, data): topic = _make_topic(topic) - data = Data.from_py(data) + data = Data.from_py(data) return _broker.Endpoint.publish(self, topic, data) def publish_batch(self, *batch): @@ -449,8 +455,8 @@ def publish_batch(self, *batch): return _broker.Endpoint.publish_batch(self, _broker.VectorPairTopicData(batch)) def attach_master(self, name, type=None, opts={}): - bopts = _broker.MapBackendOptions() # Generator expression doesn't work here. - for (k, v) in opts.items(): + bopts = _broker.MapBackendOptions() # Generator expression doesn't work here. + for k, v in opts.items(): bopts[k] = Data.from_py(v) s = _broker.Endpoint.attach_master(self, name, type, bopts) if not s.is_valid(): @@ -473,9 +479,11 @@ def attach_clone(self, name): def await_peer(self, node, timeout=None): if timeout: - return _broker.Endpoint.await_peer(self, node, _broker.Timespan(float(timeout))) + return _broker.Endpoint.await_peer( + self, node, _broker.Timespan(float(timeout)) + ) else: - return _broker.Endpoint.await_peer(self, node) + return _broker.Endpoint.await_peer(self, node) def __enter__(self): return self @@ -483,14 +491,17 @@ def __enter__(self): def __exit__(self, type, value, traceback): self.shutdown() + class Message: def to_broker(self): assert False and "method not overridden" + from . import zeek + class Data(_broker.Data): - def __init__(self, x = None): + def __init__(self, x=None): if x is None: _broker.Data.__init__(self) @@ -500,8 +511,26 @@ def __init__(self, x = None): elif isinstance(x, _broker.Data): _broker.Data.__init__(self, x) - elif isinstance(x, (bool, int, float, str, bytes, - Address, Count, Enum, Port, Set, Subnet, Table, Timespan, Timestamp, Vector)): + elif isinstance( + x, + ( + bool, + int, + float, + str, + bytes, + Address, + Count, + Enum, + Port, + Set, + Subnet, + Table, + Timespan, + Timestamp, + Vector, + ), + ): _broker.Data.__init__(self, x) elif isinstance(x, datetime.timedelta): @@ -534,12 +563,12 @@ def __init__(self, x = None): _broker.Data.__init__(self, v) elif isinstance(x, set) or isinstance(x, frozenset): - s = _broker.Set(([Data(i) for i in x])) + s = _broker.Set([Data(i) for i in x]) _broker.Data.__init__(self, s) elif isinstance(x, dict) or isinstance(x, types.MappingProxyType): t = _broker.Table() - for (k, v) in x.items(): + for k, v in x.items(): t[Data(k)] = Data(v) _broker.Data.__init__(self, t) @@ -563,12 +592,16 @@ def to_subnet(s): # Python < 3.5 does not have a nicer way of setting the prefixlen # when creating from packed data. if s.network().is_v4(): - return ipaddress.IPv4Network(to_ipaddress(s.network())).supernet(new_prefix=s.length()) + return ipaddress.IPv4Network(to_ipaddress(s.network())).supernet( + new_prefix=s.length() + ) else: - return ipaddress.IPv6Network(to_ipaddress(s.network())).supernet(new_prefix=s.length()) + return ipaddress.IPv6Network(to_ipaddress(s.network())).supernet( + new_prefix=s.length() + ) def to_set(s): - return set([Data.to_py(i) for i in s]) + return {Data.to_py(i) for i in s} def to_table(t): return {Data.to_py(k): Data.to_py(v) for (k, v) in t.items()} @@ -578,7 +611,7 @@ def to_vector(v): def _try_bytes_decode(b): try: - return b.decode('utf-8') + return b.decode("utf-8") except: return b @@ -596,8 +629,10 @@ def _try_bytes_decode(b): Data.Type.Subnet: lambda: to_subnet(d.as_subnet()), Data.Type.Table: lambda: to_table(d.as_table()), Data.Type.Timespan: lambda: datetime.timedelta(seconds=d.as_timespan()), - Data.Type.Timestamp: lambda: datetime.datetime.fromtimestamp(d.as_timestamp(), utc), - Data.Type.Vector: lambda: to_vector(d.as_vector()) + Data.Type.Timestamp: lambda: datetime.datetime.fromtimestamp( + d.as_timestamp(), utc + ), + Data.Type.Vector: lambda: to_vector(d.as_vector()), } try: @@ -605,17 +640,19 @@ def _try_bytes_decode(b): except KeyError: raise TypeError("unsupported data type: " + str(d.get_type())) + class ImmutableData(Data): """A Data specialization that uses immutable complex types for returned Python objects. For sets, the return type is frozenset, for tables it's a hashable, read-only derivative of dict, and for vectors it's Python tuples. """ + class HashableReadOnlyDict(dict): def __hash__(self): return hash(frozenset(self.items())) def __readonly__(self, *args, **kwargs): - raise TypeError('cannot modify this dict') + raise TypeError("cannot modify this dict") # https://stackoverflow.com/a/31049908 __setitem__ = __readonly__ @@ -633,7 +670,9 @@ def to_set(s): return frozenset([ImmutableData.to_py(i) for i in s]) def to_table(t): - tmp = {ImmutableData.to_py(k): ImmutableData.to_py(v) for (k, v) in t.items()} + tmp = { + ImmutableData.to_py(k): ImmutableData.to_py(v) for (k, v) in t.items() + } # It's tempting here to use types.MappingProxyType here, but it is # not hashable, so doesn't solve our main problem, and cannot be # derived from. @@ -645,7 +684,7 @@ def to_vector(v): converters = { Data.Type.Set: lambda: to_set(d.as_set()), Data.Type.Table: lambda: to_table(d.as_table()), - Data.Type.Vector: lambda: to_vector(d.as_vector()) + Data.Type.Vector: lambda: to_vector(d.as_vector()), } try: @@ -654,6 +693,7 @@ def to_vector(v): # Fall back on the Data class for types we handle identically. return Data.to_py(d) + ####### TODO: Updated to new Broker API until here. # # TODO: complete interface diff --git a/bindings/python/broker/zeek.py b/bindings/python/broker/zeek.py index d7e8c5b86..2742b435e 100644 --- a/bindings/python/broker/zeek.py +++ b/bindings/python/broker/zeek.py @@ -1,4 +1,3 @@ - try: from . import _broker except ImportError: @@ -6,11 +5,13 @@ import broker + # Keep this in sync with zeek.hh class MetadataType: NetworkTimestamp = broker.Count(1) UserMetadataStart = broker.Count(200) + class Event(_broker.zeek.Event): def __init__(self, *args, metadata=None): if len(args) == 1 and not isinstance(args[0], str): @@ -22,7 +23,9 @@ def __init__(self, *args, metadata=None): if metadata is not None: # Convert dicts to lists and make a copy so we don't # modify the callers argument. - metadata = list(metadata.items() if hasattr(metadata, "items") else metadata) + metadata = list( + metadata.items() if hasattr(metadata, "items") else metadata + ) # Convert non-counts to counts for convencience. for i, m in enumerate(metadata): if not isinstance(m[0], broker.Count): @@ -30,8 +33,9 @@ def __init__(self, *args, metadata=None): broker_metadata = broker.Data.from_py(metadata) - _broker.zeek.Event.__init__(self, args[0], broker.Data.from_py(args[1:]), - metadata=broker_metadata) + _broker.zeek.Event.__init__( + self, args[0], broker.Data.from_py(args[1:]), metadata=broker_metadata + ) def args(self): return [broker.Data.to_py(a) for a in _broker.zeek.Event.args(self)] @@ -43,6 +47,7 @@ def metadata(self): metadata = [broker.Data.to_py(m) for m in metadata] return metadata + # Similar to the Subscriber vs SafeSubscriber specialization, this is an event # specialization that is robust to Python's limitations regarding hashable # types. If you are working with Zeek types that Python cannot naturally diff --git a/broker-throughput/broker-throughput.py b/broker-throughput/broker-throughput.py index ca2fa87d6..6a7d16b4e 100644 --- a/broker-throughput/broker-throughput.py +++ b/broker-throughput/broker-throughput.py @@ -14,6 +14,7 @@ last_t = first_t last_sent_ev1 = 0 + def printStats(stats): t = stats[0] dt = stats[1] @@ -25,20 +26,24 @@ def printStats(stats): global last_t, last_sent_ev1 now = time.time() # rate = "sending at {:.2f} ev/s, receiving at {:.2f} ev/s".format(total_sent_ev1 / (now - first_t) , total_recv_ev1 / (now - first_t)) - rate = "sending at {:.2f} ev/s, receiving at {:.2f} ev/s".format((total_sent_ev1 - last_sent_ev1) / (now - last_t), ev1 / dt.total_seconds()) + rate = f"sending at {(total_sent_ev1 - last_sent_ev1) / (now - last_t):.2f} ev/s, receiving at {ev1 / dt.total_seconds():.2f} ev/s" last_t = now last_sent_ev1 = total_sent_ev1 - print("{} dt={} ev{}={} (total {} of {}) {}".format(t, dt, event, ev1, total_recv_ev1, total_sent_ev1, rate)) + print( + f"{t} dt={dt} ev{event}={ev1} (total {total_recv_ev1} of {total_sent_ev1}) {rate}" + ) + def sendBatch(p, num): - event_1s = [broker.zeek.Event("event_{}".format(event), [i, "test"]) for i in range(num)] + event_1s = [broker.zeek.Event(f"event_{event}", [i, "test"]) for i in range(num)] for e in event_1s: p.publish(e) global total_sent_ev1 total_sent_ev1 += len(event_1s) + def wait(s, t): waited = 0 @@ -55,9 +60,10 @@ def wait(s, t): if waited >= t: break + ep = broker.Endpoint() s = ep.make_subscriber("/benchmark/stats") -ss = ep.make_status_subscriber(True); +ss = ep.make_status_subscriber(True) ep.peer("127.0.0.1", 9999) # Wait until connection is established. @@ -71,7 +77,7 @@ def wait(s, t): while True: sendBatch(p, 5000) - wait(s, .001) + wait(s, 0.001) if ss.available(): print(ss.get()) diff --git a/ci/ubuntu-20.04/Dockerfile b/ci/ubuntu-20.04/Dockerfile index 257c7b9a9..13d73743f 100644 --- a/ci/ubuntu-20.04/Dockerfile +++ b/ci/ubuntu-20.04/Dockerfile @@ -13,6 +13,6 @@ RUN apt-get update && apt-get -y install \ libssl-dev \ make \ python3.9 \ - python3-dev \ + python3.9-dev \ && apt autoclean \ && rm -rf /var/lib/apt/lists/* diff --git a/doc/_examples/ping.py b/doc/_examples/ping.py index 92853eeef..9f680b32e 100644 --- a/doc/_examples/ping.py +++ b/doc/_examples/ping.py @@ -1,13 +1,15 @@ # ping.py import sys + import broker # Setup endpoint and connect to Zeek. -with broker.Endpoint() as ep, \ - ep.make_subscriber("/topic/test") as sub, \ - ep.make_status_subscriber(True) as ss: - +with ( + broker.Endpoint() as ep, + ep.make_subscriber("/topic/test") as sub, + ep.make_status_subscriber(True) as ss, +): ep.peer("127.0.0.1", 9999) # Wait until connection is established. @@ -19,10 +21,9 @@ for n in range(5): # Send event "ping(n)". - ping = broker.zeek.Event("ping", n); - ep.publish("/topic/test", ping); - + ping = broker.zeek.Event("ping", n) + ep.publish("/topic/test", ping) # Wait for "pong" reply event. (t, d) = sub.get() pong = broker.zeek.Event(d) - print("received {}{}".format(pong.name(), pong.args())) + print(f"received {pong.name()}{pong.args()}") diff --git a/doc/_examples/sqlite-connect.py b/doc/_examples/sqlite-connect.py index dd413d710..cd5225963 100644 --- a/doc/_examples/sqlite-connect.py +++ b/doc/_examples/sqlite-connect.py @@ -1,24 +1,25 @@ # sqlite-connect.py -import broker import sys import time -with broker.Endpoint() as ep, \ - ep.make_subscriber('/test') as s, \ - ep.make_status_subscriber(True) as ss: - - ep.peer('127.0.0.1', 9999, 1.0) +import broker - st = ss.get(); +with ( + broker.Endpoint() as ep, + ep.make_subscriber("/test") as s, + ep.make_status_subscriber(True) as ss, +): + ep.peer("127.0.0.1", 9999, 1.0) + st = ss.get() if not (type(st) == broker.Status and st.code() == broker.SC.PeerAdded): - print('could not connect') + print("could not connect") sys.exit(1) - c = ep.attach_clone('mystore') + c = ep.attach_clone("mystore") while True: time.sleep(1) - c.increment('foo', 1) - print(c.get('foo')) + c.increment("foo", 1) + print(c.get("foo")) diff --git a/doc/_examples/sqlite-listen.py b/doc/_examples/sqlite-listen.py index 24f628c6d..54195d5a2 100644 --- a/doc/_examples/sqlite-listen.py +++ b/doc/_examples/sqlite-listen.py @@ -2,15 +2,15 @@ import broker -with broker.Endpoint() as ep, \ - ep.make_subscriber('/test') as s, \ - ep.make_status_subscriber(True) as ss: +with ( + broker.Endpoint() as ep, + ep.make_subscriber("/test") as s, + ep.make_status_subscriber(True) as ss, +): + ep.listen("127.0.0.1", 9999) - ep.listen('127.0.0.1', 9999) - - m = ep.attach_master('mystore', - broker.Backend.SQLite, {'path': 'mystore.sqlite'}) + m = ep.attach_master("mystore", broker.Backend.SQLite, {"path": "mystore.sqlite"}) while True: print(ss.get()) - print(m.get('foo')) + print(m.get("foo")) diff --git a/doc/conf.py b/doc/conf.py index 54cd153e5..ab16cbe76 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -4,47 +4,48 @@ # # needs_sphinx = '1.0' +import os +import sys from datetime import date -import sys, os # Add custom extensions directory to search path. -sys.path.insert(0, os.path.abspath('./extensions')) +sys.path.insert(0, os.path.abspath("./extensions")) # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. extensions = [ - 'numsec', - 'sphinx.ext.autodoc', - 'sphinx.ext.todo', + "numsec", + "sphinx.ext.autodoc", + "sphinx.ext.todo", ] # Add any paths that contain templates here, relative to this directory. -templates_path = ['_templates'] +templates_path = ["_templates"] # The suffix(es) of source filenames. # You can specify multiple suffix as a list of string: # # source_suffix = ['.rst', '.md'] -source_suffix = '.rst' +source_suffix = ".rst" # The encoding of source files. # source_encoding = 'utf-8-sig' # The master toctree document. -master_doc = 'index' +master_doc = "index" # General information about the project. -project = u'Broker' -author = u'The Zeek Project' -copyright = str(date.today().year) + ', ' + author +project = "Broker" +author = "The Zeek Project" +copyright = str(date.today().year) + ", " + author # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -with open('../VERSION', 'r') as f: +with open("../VERSION") as f: version = f.readline().strip() # The full version, including alpha/beta/rc tags. @@ -69,7 +70,7 @@ # List of patterns, relative to source directory, that match files and # directories to ignore when looking for source files. # This patterns also effect to html_static_path and html_extra_path -exclude_patterns = ['_build', '.DS_Store'] +exclude_patterns = ["_build", ".DS_Store"] # The reST default role (used for this markup: `text`) to use for all # documents. @@ -91,9 +92,9 @@ # show_authors = False # The name of the Pygments (syntax highlighting) style to use. -pygments_style = 'sphinx' +pygments_style = "sphinx" -highlight_language = 'C++' +highlight_language = "C++" # A list of ignored prefixes for module index sorting. # modindex_common_prefix = [] @@ -110,21 +111,21 @@ # The theme to use for HTML and HTML Help pages. See the documentation for # a list of builtin themes. # -html_theme = 'sphinx_rtd_theme' +html_theme = "sphinx_rtd_theme" # Theme options are theme-specific and customize the look and feel of a theme # further. For a list of options available for each theme, see the # documentation. html_theme_options = { - 'collapse_navigation': False, - 'display_version': True, + "collapse_navigation": False, + "display_version": True, } # The name for this set of Sphinx documents. # " v documentation" by default. # -html_title = u'Broker User Manual' +html_title = "Broker User Manual" # A shorter title for the navigation bar. Default is the same as html_title. # @@ -142,7 +143,7 @@ # Add any paths that contain custom static files (such as style sheets) here, # relative to this directory. They are copied after the builtin static files, # so a file named "default.css" will overwrite the builtin "default.css". -html_static_path = ['_static'] +html_static_path = ["_static"] # Add any extra paths that contain custom files (such as robots.txt or # .htaccess) here, relative to this directory. These files are copied @@ -152,9 +153,9 @@ # Custom sidebar templates, maps document names to template names. # # html_sidebars = {} -#html_sidebars = { +# html_sidebars = { # '**': ['custom-sidebar.html'] -#} +# } # If true, "Created using Sphinx" is shown in the HTML footer. Default is True. html_show_sphinx = False @@ -163,12 +164,12 @@ html_show_copyright = True # Output file base name for HTML help builder. -htmlhelp_basename = 'broker-doc' +htmlhelp_basename = "broker-doc" # -- Options for PDF output ----------------------------------------------- pdf_documents = [ - ('index', u'user-manual', u'Broker', u'The Zeek Project'), + ("index", "user-manual", "Broker", "The Zeek Project"), ] -pdf_stylesheets = ['sphinx', 'kerning', 'letter'] +pdf_stylesheets = ["sphinx", "kerning", "letter"] diff --git a/doc/extensions/numsec.py b/doc/extensions/numsec.py index d356452b9..6ccfbce41 100644 --- a/doc/extensions/numsec.py +++ b/doc/extensions/numsec.py @@ -29,34 +29,35 @@ DAMAGE. """ -from docutils import nodes import sphinx.domains.std +from docutils import nodes -class CustomStandardDomain(sphinx.domains.std.StandardDomain): +class CustomStandardDomain(sphinx.domains.std.StandardDomain): def __init__(self, env): - env.settings['footnote_references'] = 'superscript' + env.settings["footnote_references"] = "superscript" sphinx.domains.std.StandardDomain.__init__(self, env) - def resolve_xref(self, env, fromdocname, builder, - typ, target, node, contnode): - res = super(CustomStandardDomain, self).resolve_xref(env, fromdocname, builder, - typ, target, node, contnode) + def resolve_xref(self, env, fromdocname, builder, typ, target, node, contnode): + res = super().resolve_xref( + env, fromdocname, builder, typ, target, node, contnode + ) if res is None: return res - if typ == 'ref' and not node['refexplicit']: - docname, labelid, sectname = self.data['labels'].get(target, ('','','')) - res['refdocname'] = docname + if typ == "ref" and not node["refexplicit"]: + docname, labelid, sectname = self.data["labels"].get(target, ("", "", "")) + res["refdocname"] = docname return res + def doctree_resolved(app, doctree, docname): secnums = app.builder.env.toc_secnumbers for node in doctree.traverse(nodes.reference): - if 'refdocname' in node: - refdocname = node['refdocname'] + if "refdocname" in node: + refdocname = node["refdocname"] if refdocname in secnums: secnum = secnums[refdocname] toclist = app.builder.env.tocs[refdocname] @@ -66,14 +67,15 @@ def doctree_resolved(app, doctree, docname): anchorname = None for refnode in toclist.traverse(nodes.reference): if refnode.astext() == text: - anchorname = refnode['anchorname'] + anchorname = refnode["anchorname"] if anchorname is None: continue - num = '.'.join(map(str, secnum[anchorname])) - prefix = 'Section ' + num = ".".join(map(str, secnum[anchorname])) + prefix = "Section " linktext = prefix + num child.parent.replace(child, nodes.Text(linktext)) + def setup(app): app.add_domain(CustomStandardDomain, override=True) - app.connect('doctree-resolved', doctree_resolved) + app.connect("doctree-resolved", doctree_resolved) diff --git a/ruff.toml b/ruff.toml new file mode 100644 index 000000000..ce24954b0 --- /dev/null +++ b/ruff.toml @@ -0,0 +1,14 @@ +target-version = "py39" + +# The btest directories skipped below contain python code embedding in btest +# wrappers. Ruff complains about them not being real python files, so just +# exclude them. +exclude = ["auxil", + "bindings/python/3rdparty", + "tests/btest/handshake", + "tests/btest/endpoint", + "tests/btest/web-socket", + "tests/btest//store"] + +[lint] +select = ["C4", "F", "I", "ISC", "UP"] \ No newline at end of file diff --git a/tests/btest/scripts/extract-json-keys.py b/tests/btest/scripts/extract-json-keys.py index 75511f1de..3d9922074 100755 --- a/tests/btest/scripts/extract-json-keys.py +++ b/tests/btest/scripts/extract-json-keys.py @@ -9,41 +9,45 @@ # whereas is either a comma-separated list or a file path prefixed # with an '@'. -import sys, json +import json +import sys keys = [] + def rec_scan_keys(xs, prefix): for key, val in xs.items(): if not prefix: full_key = key else: - full_key = '{}.{}'.format(prefix, key) + full_key = f"{prefix}.{key}" keys.append(full_key) if type(val) is dict: rec_scan_keys(val, full_key) + def do_scan(fname): - if fname == '-': + if fname == "-": rec_scan_keys(json.load(sys.stdin), None) return with open(fname) as f: rec_scan_keys(json.load(f), None) -do_scan('-' if len(sys.argv) < 2 else sys.argv[1]) + +do_scan("-" if len(sys.argv) < 2 else sys.argv[1]) # Mode 2? Check all keys. if len(sys.argv) == 3: # Get the keys we are looking for. required_keys = [] ls = sys.argv[2] - if ls.startswith('@'): + if ls.startswith("@"): with open(ls[1:]) as f: required_keys = [line.rstrip() for line in f] else: - required_keys.split(',') + required_keys.split(",") # Check that each required key exists in the input. - print("required_keys: {}".format(required_keys)) + print(f"required_keys: {required_keys}") for key in required_keys: if not key in keys: sys.exit(1) diff --git a/tests/btest/scripts/wire_format.py b/tests/btest/scripts/wire_format.py index c0427413b..4085458b6 100644 --- a/tests/btest/scripts/wire_format.py +++ b/tests/btest/scripts/wire_format.py @@ -1,23 +1,27 @@ -import struct, uuid, sys, socket, time - +import socket +import struct +import sys +import time +import traceback +import uuid from collections import namedtuple from enum import IntEnum from inspect import getframeinfo, stack class MessageType(IntEnum): - DATA = 1 - COMMAND = 2 - ROUTING_UPDATE = 3 - PING = 4 - PONG = 5 - HELLO = 6 - PROBE = 7 - VERSION_SELECT = 8 - DROP_CONN = 9 - ORIGINATOR_SYN = 10 - RESPONDER_SYN_ACK = 11 - ORIGINATOR_ACK = 12 + DATA = 1 + COMMAND = 2 + ROUTING_UPDATE = 3 + PING = 4 + PONG = 5 + HELLO = 6 + PROBE = 7 + VERSION_SELECT = 8 + DROP_CONN = 9 + ORIGINATOR_SYN = 10 + RESPONDER_SYN_ACK = 11 + ORIGINATOR_ACK = 12 # Returns Broker's magic number. @@ -34,10 +38,10 @@ def behead(buf, size): def vb_encode(n): buf = bytearray() x = n - while x > 0x7f: - buf.append((x & 0x7f) | 0x80) + while x > 0x7F: + buf.append((x & 0x7F) | 0x80) x = x >> 7 - buf.append(x & 0x7f) + buf.append(x & 0x7F) return buf @@ -49,7 +53,7 @@ def vb_decode(buf): while True: low7 = buf[0] buf = buf[1:] - x = x | ((low7 & 0x7f) << (7 * n)) + x = x | ((low7 & 0x7F) << (7 * n)) n += 1 if low7 & 0x80 == 0: return (x, buf) @@ -84,7 +88,7 @@ def unpack_string(buf): def unpack_subscriptions(buf): result = [] (size, remainder) = vb_decode(buf) - print(f'unpack subscription of size {size}') + print(f"unpack subscription of size {size}") for i in range(size): (sub, remainder) = unpack_string(remainder) result.append(sub) @@ -93,51 +97,53 @@ def unpack_subscriptions(buf): # -- pack and unpack functions for handshake messages -------------------------- + def unpack_hello(buf): - HelloMsg = namedtuple('HelloMsg', - ['magic', 'sender_id', 'min_version', 'max_version']) - (magic, uuid_bytes, vmin, vmax) = struct.unpack('!I16sBB', buf) + HelloMsg = namedtuple( + "HelloMsg", ["magic", "sender_id", "min_version", "max_version"] + ) + (magic, uuid_bytes, vmin, vmax) = struct.unpack("!I16sBB", buf) return HelloMsg(magic, uuid.UUID(bytes=uuid_bytes), vmin, vmax) def pack_hello(sender_id, vmin, vmax): - return struct.pack('!I16sBB', magic_number(), sender_id.bytes, vmin, vmax) + return struct.pack("!I16sBB", magic_number(), sender_id.bytes, vmin, vmax) def unpack_probe(buf): - ProbeMsg = namedtuple('ProbeMsg', ['magic']) - magic = struct.unpack('!I', buf)[0] + ProbeMsg = namedtuple("ProbeMsg", ["magic"]) + magic = struct.unpack("!I", buf)[0] return ProbeMsg(magic) def pack_probe(msg): (magic) = msg - return struct.pack('!I', magic) + return struct.pack("!I", magic) def pack_version_select(sender_id, version): - return struct.pack('!I16sB', magic_number(), sender_id.bytes, version) + return struct.pack("!I16sB", magic_number(), sender_id.bytes, version) def unpack_version_select(buf): - VersionSelectMsg = namedtuple('VersionSelectMsg', - ['magic', 'sender_id', 'version']) - (magic, sender_id, version) = struct.unpack('!I16sB', buf) + VersionSelectMsg = namedtuple("VersionSelectMsg", ["magic", "sender_id", "version"]) + (magic, sender_id, version) = struct.unpack("!I16sB", buf) return VersionSelectMsg(magic, uuid.UUID(bytes=sender_id), version) def pack_drop_conn(sender_id, code, description): - buf = struct.pack('!I16sB', magic_number(), sender_id.bytes, code) + buf = struct.pack("!I16sB", magic_number(), sender_id.bytes, code) arr = bytearray(buf) arr.extend(pack_string(description)) return arr def unpack_drop_conn(buf): - DropConnMsg = namedtuple('DropConnMsg', - ['magic', 'sender_id', 'code', 'description']) + DropConnMsg = namedtuple( + "DropConnMsg", ["magic", "sender_id", "code", "description"] + ) (head, tail) = behead(buf, 21) - (magic, sender_id, code) = struct.unpack('!I16sB', head) + (magic, sender_id, code) = struct.unpack("!I16sB", head) description = unpack_string(tail) return DropConnMsg(magic, uuid.UUID(bytes=sender_id), code, description) @@ -157,8 +163,8 @@ def pack_originator_syn(subs): def unpack_originator_syn(buf): (subscriptions, remainder) = unpack_subscriptions(buf) if len(remainder) > 0: - raise RuntimeError('unpack_originator_syn: trailing bytes') - OriginatorSynMsg = namedtuple('OriginatorSynMsg', ['subscriptions']) + raise RuntimeError("unpack_originator_syn: trailing bytes") + OriginatorSynMsg = namedtuple("OriginatorSynMsg", ["subscriptions"]) return OriginatorSynMsg(subscriptions) @@ -169,25 +175,27 @@ def pack_responder_syn_ack(subs): def unpack_responder_syn_ack(buf): (subscriptions, remainder) = unpack_subscriptions(buf) if len(remainder) > 0: - raise RuntimeError('unpack_responder_syn_ack: trailing bytes') - ResponderSynAck = namedtuple('ResponderSynAck', ['subscriptions']) + raise RuntimeError("unpack_responder_syn_ack: trailing bytes") + ResponderSynAck = namedtuple("ResponderSynAck", ["subscriptions"]) return ResponderSynAck(subscriptions) # -- utility functions for socket I/O on handshake messages -------------------- + def enum_to_str(e): # For compatibility around Python 3.11, this dictates how to render an enum # to a string. This changed in 3.11 for some enum types. - return type(e).__name__ + '.' + e.name + return type(e).__name__ + "." + e.name + # Reads a handshake message (phase 1 and phase 2). def read_hs_msg(fd): # -1 since we extract the tag right away - msg_len = int.from_bytes(fd.recv(4), byteorder='big', signed=False) - 1 + msg_len = int.from_bytes(fd.recv(4), byteorder="big", signed=False) - 1 tag = MessageType(fd.recv(1)[0]) tag_str = enum_to_str(tag) - print(f'received a {tag_str} message with {msg_len} bytes') + print(f"received a {tag_str} message with {msg_len} bytes") unpack_tbl = { MessageType.HELLO: unpack_hello, MessageType.PROBE: unpack_probe, @@ -203,15 +211,16 @@ def read_hs_msg(fd): # Writes a handshake message (phase 1 and phase 2). def write_hs_msg(fd, tag, buf): payload_len = len(buf) + 1 - fd.send(payload_len.to_bytes(4, byteorder='big', signed=False)) - fd.send(int(tag).to_bytes(1, byteorder='big', signed=False)) + fd.send(payload_len.to_bytes(4, byteorder="big", signed=False)) + fd.send(int(tag).to_bytes(1, byteorder="big", signed=False)) fd.send(buf) tag_str = enum_to_str(tag) - print(f'sent {tag_str} message with {payload_len} bytes') + print(f"sent {tag_str} message with {payload_len} bytes") # -- pack and unpack functions for phase 3 messages ---------------------------- + def pack_ping(buf): return buf @@ -230,60 +239,63 @@ def unpack_pong(buf): # -- utility functions for socket I/O on phase 3 messages ---------------------- + # Reads an operation-mode message (phase 3). def read_op_msg(fd): # -35 since we extract the two IDs plus tag, TTL and topic len right away - msg_len = int.from_bytes(fd.recv(4), byteorder='big', signed=False) - 37 - src = uuid.UUID(bytes=struct.unpack('!16s', fd.recv(16))[0]) - dst = uuid.UUID(bytes=struct.unpack('!16s', fd.recv(16))[0]) + msg_len = int.from_bytes(fd.recv(4), byteorder="big", signed=False) - 37 + src = uuid.UUID(bytes=struct.unpack("!16s", fd.recv(16))[0]) + dst = uuid.UUID(bytes=struct.unpack("!16s", fd.recv(16))[0]) tag = MessageType(fd.recv(1)[0]) - ttl = int.from_bytes(fd.recv(2), byteorder='big', signed=False) - topic_len = int.from_bytes(fd.recv(2), byteorder='big', signed=False) + ttl = int.from_bytes(fd.recv(2), byteorder="big", signed=False) + topic_len = int.from_bytes(fd.recv(2), byteorder="big", signed=False) topic = fd.recv(topic_len).decode() buf = fd.recv(msg_len - topic_len) tag_str = enum_to_str(tag) - print(f'received a {tag_str} with a payload of {msg_len} bytes') + print(f"received a {tag_str} with a payload of {msg_len} bytes") unpack_tbl = { MessageType.PING: unpack_ping, MessageType.PONG: unpack_pong, } - NodeMsg = namedtuple('NodeMsg', - ['sender_id', 'receiver_id', 'ttl', 'topic', 'payload']) + NodeMsg = namedtuple( + "NodeMsg", ["sender_id", "receiver_id", "ttl", "topic", "payload"] + ) return (tag, NodeMsg(src, dst, ttl, topic, unpack_tbl[tag](buf))) def write_op_msg(fd, src, dst, tag, topic, buf): payload_len = len(buf) + 37 + len(topic) - fd.send(payload_len.to_bytes(4, byteorder='big', signed=False)) - fd.send(src.bytes) # sender UUID - fd.send(dst.bytes) # receiver UUID - fd.send(int(tag).to_bytes(1, byteorder='big', signed=False)) # msg type - fd.send(int(1).to_bytes(2, byteorder='big', signed=False)) # ttl - fd.send(len(topic).to_bytes(2, byteorder='big', signed=False)) + fd.send(payload_len.to_bytes(4, byteorder="big", signed=False)) + fd.send(src.bytes) # sender UUID + fd.send(dst.bytes) # receiver UUID + fd.send(int(tag).to_bytes(1, byteorder="big", signed=False)) # msg type + fd.send((1).to_bytes(2, byteorder="big", signed=False)) # ttl + fd.send(len(topic).to_bytes(2, byteorder="big", signed=False)) fd.send(topic.encode()) fd.send(buf) tag_str = enum_to_str(tag) - print(f'sent {tag_str} message with a payload of {payload_len} bytes') + print(f"sent {tag_str} message with a payload of {payload_len} bytes") # -- minimal testing DSL ------------------------------------------------------- + def check_eq(got, want): caller = getframeinfo(stack()[1][0]) line = caller.lineno if got != want: - raise RuntimeError(f'line {line}: check failed -> {got} != {want}') - print(f'line {line}: check passed') + raise RuntimeError(f"line {line}: check failed -> {got} != {want}") + print(f"line {line}: check passed") def write_done(): - with open('done', 'w') as f: - f.write('done') + with open("done", "w") as f: + f.write("done") # Tries to connect up to 30 times before giving up. def test_main(host, port, fn): - connected = False + connected = False for i in range(30): try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as fd: @@ -294,7 +306,9 @@ def test_main(host, port, fn): sys.exit() except Exception as ex: if not connected: - print(f'failed to connect to localhost:{port}, try again', file=sys.stderr) + print( + f"failed to connect to localhost:{port}, try again", file=sys.stderr + ) time.sleep(1) else: print(str(ex)) diff --git a/tests/python/broker-cluster-benchmark.py b/tests/python/broker-cluster-benchmark.py index e9da90589..791a8ae23 100644 --- a/tests/python/broker-cluster-benchmark.py +++ b/tests/python/broker-cluster-benchmark.py @@ -1,18 +1,24 @@ -import os, sys, zipfile, subprocess, shutil - +import os +import shutil +import subprocess +import sys +import zipfile from subprocess import PIPE # -- setup and cleanup -------------------------------------------------------- -test_recording = 'zeek-dns-traffic-recording' +test_recording = "zeek-dns-traffic-recording" + class Environment: def __init__(self, executable_path): self.exe = executable_path - self.test_dir = os.environ.get('BROKER_TEST_DIR') + self.test_dir = os.environ.get("BROKER_TEST_DIR") if not os.path.isdir(self.test_dir): - raise RuntimeError('environment variable BROKER_TEST_DIR is not a valid directory') - self.input_dir = os.path.join(self.test_dir, '.tmp', 'broker-cluster-benchmark') + raise RuntimeError( + "environment variable BROKER_TEST_DIR is not a valid directory" + ) + self.input_dir = os.path.join(self.test_dir, ".tmp", "broker-cluster-benchmark") self.recording_dir = os.path.join(self.input_dir, test_recording) def __enter__(self): @@ -25,56 +31,70 @@ def __exit__(self, type, value, tb): def prepare_environment(self): os.makedirs(self.input_dir) - file_name = test_recording + '.zip' - file_path = os.path.join(self.test_dir, 'integration', file_name) - with zipfile.ZipFile(file_path, 'r') as zip_ref: + file_name = test_recording + ".zip" + file_path = os.path.join(self.test_dir, "integration", file_name) + with zipfile.ZipFile(file_path, "r") as zip_ref: zip_ref.extractall(self.input_dir) - with open(os.path.join(self.recording_dir, 'expected-tpl.conf'), mode='r') as f: - format_vars = {'path': self.recording_dir} + with open(os.path.join(self.recording_dir, "expected-tpl.conf")) as f: + format_vars = {"path": self.recording_dir} self.expected = f.read() % format_vars def clear_environment(self): if os.path.isdir(self.input_dir): shutil.rmtree(self.input_dir) + # -- integration testing ------------------------------------------------------ + def test_config_generation(exe, recording_dir, expected): dirs = [f.path for f in os.scandir(recording_dir) if f.is_dir()] - cmd = [exe, '--caf.logger.console.verbosity=quiet', '--mode=generate-config'] + dirs - with subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, close_fds=True, universal_newlines=True) as proc: + cmd = [exe, "--caf.logger.console.verbosity=quiet", "--mode=generate-config"] + dirs + with subprocess.Popen( + cmd, stdout=PIPE, stderr=PIPE, close_fds=True, universal_newlines=True + ) as proc: output, errors = proc.communicate() if proc.returncode != 0: - raise RuntimeError('failed to generate config: ' + errors) + raise RuntimeError("failed to generate config: " + errors) if output != expected: - sys.stderr.write('*** ERROR: generate-config procuded wrong result\n') - sys.stderr.write('\n*** EXPECTED:\n') + sys.stderr.write("*** ERROR: generate-config procuded wrong result\n") + sys.stderr.write("\n*** EXPECTED:\n") sys.stderr.write(expected) - sys.stderr.write('\n*** GOT:\n') + sys.stderr.write("\n*** GOT:\n") sys.stderr.write(output) sys.exit(1) - print('test_config_generation: pass') + print("test_config_generation: pass") return output + def run_benchmark(exe, config): - cmd = [exe, '--caf.logger.console.verbosity=quiet', '--cluster-config-file=-'] - with subprocess.Popen(cmd, stdout=PIPE, stderr=PIPE, stdin=PIPE, close_fds=True, universal_newlines=True) as proc: + cmd = [exe, "--caf.logger.console.verbosity=quiet", "--cluster-config-file=-"] + with subprocess.Popen( + cmd, + stdout=PIPE, + stderr=PIPE, + stdin=PIPE, + close_fds=True, + universal_newlines=True, + ) as proc: proc.stdin.write(config) proc.stdin.close() proc.stdin = None output, errors = proc.communicate() if proc.returncode != 0: - raise RuntimeError('failed to run the benchmark: ' + errors) - print('run_benchmark: pass') - sys.stdout.write('*** Benchmark output:\n') + raise RuntimeError("failed to run the benchmark: " + errors) + print("run_benchmark: pass") + sys.stdout.write("*** Benchmark output:\n") sys.stdout.write(output) # -- main --------------------------------------------------------------------- -if __name__ == '__main__': +if __name__ == "__main__": if len(sys.argv) != 2: - raise RuntimeError('expected exactly one argument: path to broker-cluster-benchmark') + raise RuntimeError( + "expected exactly one argument: path to broker-cluster-benchmark" + ) with Environment(sys.argv[1]) as e: config = test_config_generation(e.exe, e.recording_dir, e.expected) run_benchmark(e.exe, config) diff --git a/tests/python/communication.py b/tests/python/communication.py index 7e2a57719..1c7d861a5 100644 --- a/tests/python/communication.py +++ b/tests/python/communication.py @@ -1,19 +1,19 @@ - -import unittest -import multiprocessing -import sys -import time import ipaddress +import time +import unittest import broker + class TestCommunication(unittest.TestCase): def test_ping(self): # --peer-start - with broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep1.make_subscriber("/test") as s1, \ - ep2.make_subscriber("/test") as s2: + with ( + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep1.make_subscriber("/test") as s1, + ep2.make_subscriber("/test") as s2, + ): port = ep1.listen("127.0.0.1", 0) self.assertTrue(ep2.peer("127.0.0.1", port, 1.0)) @@ -39,7 +39,7 @@ def test_ping(self): if msgs: self.assertEqual(len(msgs), 1) (t, d) = msgs[0] - break; + break time.sleep(0.1) @@ -47,10 +47,11 @@ def test_ping(self): self.assertEqual(d[0], "pong") def test_messages(self): - with broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep1.make_subscriber("/test") as s1: - + with ( + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep1.make_subscriber("/test") as s1, + ): port = ep1.listen("127.0.0.1", 0) self.assertTrue(ep2.peer("127.0.0.1", port, 1.0)) @@ -62,7 +63,10 @@ def test_messages(self): # --messages-start msg1 = ("/test/2", (1, 2, 3)) - msg2 = ("/test/3", (42, "foo", {"a": "A", "b": ipaddress.IPv4Address('1.2.3.4')})) + msg2 = ( + "/test/3", + (42, "foo", {"a": "A", "b": ipaddress.IPv4Address("1.2.3.4")}), + ) ep2.publish_batch(msg1, msg2) # --messages-end @@ -81,14 +85,15 @@ def test_messages(self): self.assertEqual(len(dict_data), 3) def test_immutable_messages(self): - with broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep1.make_safe_subscriber("/test") as s1: - + with ( + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep1.make_safe_subscriber("/test") as s1, + ): port = ep1.listen("127.0.0.1", 0) ep2.peer("127.0.0.1", port, 1.0) - msg = ("/test/1", ({"a": "A"}, set([1,2,3]), ('a', 'b', 'c'))) + msg = ("/test/1", ({"a": "A"}, {1, 2, 3}, ("a", "b", "c"))) ep2.publish(*msg) topic, (dict_data, set_data, tuple_data) = s1.get() @@ -103,14 +108,15 @@ def test_immutable_messages(self): set_data.add(4) with self.assertRaises(TypeError): # 'tuple' object does not support item assignment - tuple_data[3] = 'd' + tuple_data[3] = "d" def test_publisher(self): - with broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep1.make_subscriber("/test") as s1, \ - ep2.make_publisher("/test") as p2: - + with ( + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep1.make_subscriber("/test") as s1, + ep2.make_publisher("/test") as p2, + ): port = ep1.listen("127.0.0.1", 0) self.assertTrue(ep2.peer("127.0.0.1", port, 1.0)) @@ -129,11 +135,12 @@ def test_publisher(self): def test_status_subscriber(self): # --status-start - with broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep1.make_status_subscriber(True) as es1, \ - ep2.make_status_subscriber(True) as es2: - + with ( + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep1.make_status_subscriber(True) as es1, + ep2.make_status_subscriber(True) as es2, + ): port = ep1.listen("127.0.0.1", 0) self.assertEqual(ep2.peer("127.0.0.1", port, 1.0), True) @@ -156,10 +163,9 @@ def test_status_subscriber(self): def test_status_subscriber_error(self): # --error-start - with broker.Endpoint() as ep1, \ - ep1.make_status_subscriber() as es1: - r = ep1.peer("127.0.0.1", 1947, 0.0) # Try unavailable port, no retry - self.assertEqual(r, False) # Not shown in docs. + with broker.Endpoint() as ep1, ep1.make_status_subscriber() as es1: + r = ep1.peer("127.0.0.1", 1947, 0.0) # Try unavailable port, no retry + self.assertEqual(r, False) # Not shown in docs. st1 = es1.get() # s1.code() == broker.EC.PeerUnavailable # --error-end @@ -174,11 +180,13 @@ def test_status_subscriber_error(self): self.assertEqual(st1.code(), broker.EC.PeerUnavailable) def test_idle_endpoint(self): - with broker.Endpoint() as ep1, \ - ep1.make_status_subscriber() as es1, \ - ep1.make_subscriber("/test") as s1: - + with ( + broker.Endpoint() as ep1, + ep1.make_status_subscriber() as es1, # noqa: F841 + ep1.make_subscriber("/test") as s1, # noqa: F841 + ): pass -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/data.py b/tests/python/data.py index 51244cb4b..5225ababa 100644 --- a/tests/python/data.py +++ b/tests/python/data.py @@ -5,10 +5,9 @@ # import datetime -import time -import math import ipaddress -import sys +import math +import time import types import unittest @@ -16,7 +15,6 @@ class TestDataConstruction(unittest.TestCase): - # Given a Python value 'p' and expected broker type 't', convert 'p' to # the corresponding broker type and verify that it has the correct type # and the expected string representation 's' (note this is not necessarily @@ -24,7 +22,7 @@ class TestDataConstruction(unittest.TestCase): def check_to_broker(self, p, s, t): assert not isinstance(p, broker.Data) b = broker.Data(p) - #print("[to_broker] ({} / {}) -> ({} / {}) (expected: {} / {})".format(p, type(p), str(b), b.get_type(), s, t)) + # print("[to_broker] ({} / {}) -> ({} / {}) (expected: {} / {})".format(p, type(p), str(b), b.get_type(), s, t)) if s is not None: self.assertEqual(str(b), s) @@ -37,7 +35,7 @@ def check_to_broker(self, p, s, t): # the original python value 'p'. def check_to_py(self, b, p): b2p = broker.Data.to_py(b) - #print("[to_py] data({} / {}) -> ({} / {})".format(str(b), b.get_type(), b2p, type(b2p))) + # print("[to_py] data({} / {}) -> ({} / {})".format(str(b), b.get_type(), b2p, type(b2p))) self.assertIsInstance(b2p, type(p)) @@ -56,9 +54,11 @@ def check_to_py(self, b, p): self.assertEqual(b2p.minute, p.minute) self.assertEqual(b2p.second, p.second) - us_equal = (b2p.microsecond == p.microsecond or - b2p.microsecond == p.microsecond - 1 or - b2p.microsecond == p.microsecond + 1) + us_equal = ( + b2p.microsecond == p.microsecond + or b2p.microsecond == p.microsecond - 1 + or b2p.microsecond == p.microsecond + 1 + ) self.assertTrue(us_equal) else: # 'b2p' is in UTC and 'p' is assumed to be local time @@ -66,7 +66,9 @@ def check_to_py(self, b, p): b2p_us = b2p.microsecond - b2p_ts = (b2p - datetime.datetime(1970, 1, 1, tzinfo=broker.utc)).total_seconds() + b2p_ts = ( + b2p - datetime.datetime(1970, 1, 1, tzinfo=broker.utc) + ).total_seconds() b2p_ts = math.trunc(b2p_ts) p_us = p.microsecond @@ -75,9 +77,7 @@ def check_to_py(self, b, p): self.assertEqual(b2p_ts, p_ts) - us_equal = (b2p_us == p_us or - b2p_us == p_us - 1 or - b2p_us == p_us + 1) + us_equal = b2p_us == p_us or b2p_us == p_us - 1 or b2p_us == p_us + 1 self.assertTrue(us_equal) else: @@ -94,66 +94,80 @@ def check_to_broker_and_back(self, p, s, t, p_final=None): return b def test_bool(self): - self.check_to_broker_and_back(True, 'T', broker.Data.Type.Boolean) - self.check_to_broker_and_back(False, 'F', broker.Data.Type.Boolean) + self.check_to_broker_and_back(True, "T", broker.Data.Type.Boolean) + self.check_to_broker_and_back(False, "F", broker.Data.Type.Boolean) def test_integer(self): - self.check_to_broker_and_back(42, '42', broker.Data.Type.Integer) + self.check_to_broker_and_back(42, "42", broker.Data.Type.Integer) - self.check_to_broker_and_back(-42, '-42', broker.Data.Type.Integer) + self.check_to_broker_and_back(-42, "-42", broker.Data.Type.Integer) # Test a value that is beyond range of unsigned 32-bit integer - self.check_to_broker_and_back(5123123123, '5123123123', broker.Data.Type.Integer) + self.check_to_broker_and_back( + 5123123123, "5123123123", broker.Data.Type.Integer + ) def test_count(self): - self.check_to_broker_and_back(broker.Count(42), '42', broker.Data.Type.Count) + self.check_to_broker_and_back(broker.Count(42), "42", broker.Data.Type.Count) # Test a value that is beyond range of unsigned 32-bit integer - self.check_to_broker_and_back(broker.Count(5123123123), '5123123123', broker.Data.Type.Count) + self.check_to_broker_and_back( + broker.Count(5123123123), "5123123123", broker.Data.Type.Count + ) def test_count_overflow(self): - with self.assertRaises(Exception) as context: + with self.assertRaises(Exception) as context: # noqa: F841 # I've seen this raise either OverflowError or SystemError # depending on Python version is seems. - self.check_to_broker(broker.Count(-1), '-1', broker.Data.Type.Count) + self.check_to_broker(broker.Count(-1), "-1", broker.Data.Type.Count) def test_real(self): - self.check_to_broker_and_back(1e18, '1000000000000000000.000000', broker.Data.Type.Real) - self.check_to_broker_and_back(4.2, '4.200000', broker.Data.Type.Real) - self.check_to_broker_and_back(-4.2, '-4.200000', broker.Data.Type.Real) + self.check_to_broker_and_back( + 1e18, "1000000000000000000.000000", broker.Data.Type.Real + ) + self.check_to_broker_and_back(4.2, "4.200000", broker.Data.Type.Real) + self.check_to_broker_and_back(-4.2, "-4.200000", broker.Data.Type.Real) def test_timespan(self): to_us = lambda x: x.microseconds + (x.seconds + x.days * 24 * 3600) * 10**6 to_ns = lambda x: to_us(x) * 10**3 # Setup timespan values - neg1ms = datetime.timedelta(microseconds = -1000) + neg1ms = datetime.timedelta(microseconds=-1000) neg1ms_ns = -1000 * 10**3 self.assertEqual(neg1ms_ns, to_ns(neg1ms)) - neg42sec = datetime.timedelta(milliseconds = -42 * 10**3) + neg42sec = datetime.timedelta(milliseconds=-42 * 10**3) neg42sec_ns = (-42 * 10**6) * 10**3 self.assertEqual(neg42sec_ns, to_ns(neg42sec)) - pos1ms2us = datetime.timedelta(milliseconds = 1, microseconds = 2) + pos1ms2us = datetime.timedelta(milliseconds=1, microseconds=2) pos1ms2us_ns = (1000 + 2) * 10**3 self.assertEqual(pos1ms2us_ns, to_ns(pos1ms2us)) - pos1day2s3us = datetime.timedelta(days = 1, seconds = 2, microseconds = 3) + pos1day2s3us = datetime.timedelta(days=1, seconds=2, microseconds=3) pos1day2s3us_ns = (3 + (2 + 24 * 3600) * 10**6) * 10**3 self.assertEqual(pos1day2s3us_ns, to_ns(pos1day2s3us)) # Verify Timespan only self.assertEqual(broker.Timespan(neg1ms_ns), broker.Timespan(to_ns(neg1ms))) self.assertEqual(broker.Timespan(neg42sec_ns), broker.Timespan(to_ns(neg42sec))) - self.assertEqual(broker.Timespan(pos1ms2us_ns), broker.Timespan(to_ns(pos1ms2us))) - self.assertEqual(broker.Timespan(pos1day2s3us_ns), broker.Timespan(to_ns(pos1day2s3us))) + self.assertEqual( + broker.Timespan(pos1ms2us_ns), broker.Timespan(to_ns(pos1ms2us)) + ) + self.assertEqual( + broker.Timespan(pos1day2s3us_ns), broker.Timespan(to_ns(pos1day2s3us)) + ) # Verify Data - self.check_to_broker_and_back(neg1ms, '-1000000ns', broker.Data.Type.Timespan) - self.check_to_broker_and_back(neg42sec, '-42000000000ns', broker.Data.Type.Timespan) - self.check_to_broker_and_back(pos1ms2us, '1002000ns', broker.Data.Type.Timespan) - self.check_to_broker_and_back(pos1day2s3us, '86402000003000ns', broker.Data.Type.Timespan) + self.check_to_broker_and_back(neg1ms, "-1000000ns", broker.Data.Type.Timespan) + self.check_to_broker_and_back( + neg42sec, "-42000000000ns", broker.Data.Type.Timespan + ) + self.check_to_broker_and_back(pos1ms2us, "1002000ns", broker.Data.Type.Timespan) + self.check_to_broker_and_back( + pos1day2s3us, "86402000003000ns", broker.Data.Type.Timespan + ) def test_timestamp(self): self.check_to_broker(broker.now(), None, broker.Data.Type.Timestamp) @@ -181,59 +195,69 @@ def test_timestamp(self): pass def test_string(self): - self.check_to_broker_and_back('', '', broker.Data.Type.String) - self.check_to_broker_and_back('foo', 'foo', broker.Data.Type.String) - self.check_to_broker_and_back('\ttab', '\ttab', broker.Data.Type.String) - self.check_to_broker_and_back('new\n', 'new\n', broker.Data.Type.String) + self.check_to_broker_and_back("", "", broker.Data.Type.String) + self.check_to_broker_and_back("foo", "foo", broker.Data.Type.String) + self.check_to_broker_and_back("\ttab", "\ttab", broker.Data.Type.String) + self.check_to_broker_and_back("new\n", "new\n", broker.Data.Type.String) def test_address_v4(self): - addr = ipaddress.IPv4Address('0.0.0.0') - self.check_to_broker_and_back(addr, '0.0.0.0', broker.Data.Type.Address) + addr = ipaddress.IPv4Address("0.0.0.0") + self.check_to_broker_and_back(addr, "0.0.0.0", broker.Data.Type.Address) - addr = ipaddress.IPv4Address('1.2.3.4') - self.check_to_broker_and_back(addr, '1.2.3.4', broker.Data.Type.Address) + addr = ipaddress.IPv4Address("1.2.3.4") + self.check_to_broker_and_back(addr, "1.2.3.4", broker.Data.Type.Address) - addr = ipaddress.IPv4Address('255.255.255.255') - self.check_to_broker_and_back(addr, '255.255.255.255', broker.Data.Type.Address) + addr = ipaddress.IPv4Address("255.255.255.255") + self.check_to_broker_and_back(addr, "255.255.255.255", broker.Data.Type.Address) def test_address_v6(self): - addr = ipaddress.IPv6Address('::') - self.check_to_broker_and_back(addr, '::', broker.Data.Type.Address) + addr = ipaddress.IPv6Address("::") + self.check_to_broker_and_back(addr, "::", broker.Data.Type.Address) - addr = ipaddress.IPv6Address('::1') - self.check_to_broker_and_back(addr, '::1', broker.Data.Type.Address) + addr = ipaddress.IPv6Address("::1") + self.check_to_broker_and_back(addr, "::1", broker.Data.Type.Address) - addr = ipaddress.IPv6Address('1:2:3:4:5:6:7:8') - self.check_to_broker_and_back(addr, '1:2:3:4:5:6:7:8', broker.Data.Type.Address) + addr = ipaddress.IPv6Address("1:2:3:4:5:6:7:8") + self.check_to_broker_and_back(addr, "1:2:3:4:5:6:7:8", broker.Data.Type.Address) - addr = ipaddress.IPv6Address('ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff') - self.check_to_broker_and_back(addr, 'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff', broker.Data.Type.Address) + addr = ipaddress.IPv6Address("ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff") + self.check_to_broker_and_back( + addr, "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff", broker.Data.Type.Address + ) def test_subnet_v4(self): - sn = ipaddress.IPv4Network('1.2.3.4/32') - self.check_to_broker_and_back(sn, '1.2.3.4/32', broker.Data.Type.Subnet) + sn = ipaddress.IPv4Network("1.2.3.4/32") + self.check_to_broker_and_back(sn, "1.2.3.4/32", broker.Data.Type.Subnet) - sn = ipaddress.IPv4Network('10.0.0.0/8') - self.check_to_broker_and_back(sn, '10.0.0.0/8', broker.Data.Type.Subnet) + sn = ipaddress.IPv4Network("10.0.0.0/8") + self.check_to_broker_and_back(sn, "10.0.0.0/8", broker.Data.Type.Subnet) - sn = ipaddress.IPv4Network('0.0.0.0/0') - self.check_to_broker_and_back(sn, '0.0.0.0/0', broker.Data.Type.Subnet) + sn = ipaddress.IPv4Network("0.0.0.0/0") + self.check_to_broker_and_back(sn, "0.0.0.0/0", broker.Data.Type.Subnet) def test_subnet_v6(self): - sn = ipaddress.IPv6Network('::1/128') - self.check_to_broker_and_back(sn, '::1/128', broker.Data.Type.Subnet) + sn = ipaddress.IPv6Network("::1/128") + self.check_to_broker_and_back(sn, "::1/128", broker.Data.Type.Subnet) - sn = ipaddress.IPv6Network('fc00::/7') - self.check_to_broker_and_back(sn, 'fc00::/7', broker.Data.Type.Subnet) + sn = ipaddress.IPv6Network("fc00::/7") + self.check_to_broker_and_back(sn, "fc00::/7", broker.Data.Type.Subnet) - sn = ipaddress.IPv6Network('::/0') - self.check_to_broker_and_back(sn, '::/0', broker.Data.Type.Subnet) + sn = ipaddress.IPv6Network("::/0") + self.check_to_broker_and_back(sn, "::/0", broker.Data.Type.Subnet) def test_port(self): - self.check_to_broker_and_back(broker.Port(65535, broker.Port.TCP), "65535/tcp", broker.Data.Type.Port) - self.check_to_broker_and_back(broker.Port(53, broker.Port.UDP), "53/udp", broker.Data.Type.Port) - self.check_to_broker_and_back(broker.Port(8, broker.Port.ICMP), "8/icmp", broker.Data.Type.Port) - self.check_to_broker_and_back(broker.Port(0, broker.Port.Unknown), "0/?", broker.Data.Type.Port) + self.check_to_broker_and_back( + broker.Port(65535, broker.Port.TCP), "65535/tcp", broker.Data.Type.Port + ) + self.check_to_broker_and_back( + broker.Port(53, broker.Port.UDP), "53/udp", broker.Data.Type.Port + ) + self.check_to_broker_and_back( + broker.Port(8, broker.Port.ICMP), "8/icmp", broker.Data.Type.Port + ) + self.check_to_broker_and_back( + broker.Port(0, broker.Port.Unknown), "0/?", broker.Data.Type.Port + ) def _test_set_impl(self, set_itype, set_otype=None): # Common set testing functionality for an input type into Broker and a @@ -242,25 +266,27 @@ def _test_set_impl(self, set_itype, set_otype=None): set_otype = set_itype if set_otype is None else set_otype # Test an empty set - self.check_to_broker_and_back(set_itype(), '{}', broker.Data.Type.Set, set_otype()) + self.check_to_broker_and_back( + set_itype(), "{}", broker.Data.Type.Set, set_otype() + ) # Test a simple set pi, po = set_itype([1, 2, 3]), set_otype([1, 2, 3]) - d = self.check_to_broker_and_back(pi, '{1, 2, 3}', broker.Data.Type.Set, po) + d = self.check_to_broker_and_back(pi, "{1, 2, 3}", broker.Data.Type.Set, po) - for (i, x) in enumerate(d.as_set()): + for i, x in enumerate(d.as_set()): self.check_to_broker(x, str(i + 1), broker.Data.Type.Integer) self.check_to_py(x, i + 1) # Test a set that contains various data types - d = broker.Data(set_itype(['foo', ipaddress.IPv6Address('::1'), None])) - for (i, x) in enumerate(d.as_set()): + d = broker.Data(set_itype(["foo", ipaddress.IPv6Address("::1"), None])) + for i, x in enumerate(d.as_set()): if i == 1: - self.check_to_broker(x, 'foo', broker.Data.Type.String) - self.check_to_py(x, 'foo') + self.check_to_broker(x, "foo", broker.Data.Type.String) + self.check_to_py(x, "foo") elif i == 2: - self.check_to_broker(x, '::1', broker.Data.Type.Address) - self.check_to_py(x, ipaddress.IPv6Address('::1')) + self.check_to_broker(x, "::1", broker.Data.Type.Address) + self.check_to_py(x, ipaddress.IPv6Address("::1")) # Test some of our own methods on wrapped sets. d = broker.Data(set_itype([1, 2, 3])).as_set() @@ -285,42 +311,58 @@ def _test_table_impl(self, table_itype, table_otype=None): table_otype = table_itype if table_otype is None else table_otype # Test an empty table - self.check_to_broker_and_back(table_itype({}), '{}', broker.Data.Type.Table, table_otype()) + self.check_to_broker_and_back( + table_itype({}), "{}", broker.Data.Type.Table, table_otype() + ) # Test a simple table d = {"a": 1, "b": 2, "c": 3} pi, po = table_itype(d), table_otype(d) - d = self.check_to_broker_and_back(pi, '{a -> 1, b -> 2, c -> 3}', broker.Data.Type.Table, po) + d = self.check_to_broker_and_back( + pi, "{a -> 1, b -> 2, c -> 3}", broker.Data.Type.Table, po + ) - for (i, (k, v)) in enumerate(d.as_table().items()): + for i, (k, v) in enumerate(d.as_table().items()): self.check_to_broker(k, ["a", "b", "c"][i], broker.Data.Type.String) self.check_to_py(k, ["a", "b", "c"][i]) self.check_to_broker(v, str(i + 1), broker.Data.Type.Integer) self.check_to_py(v, i + 1) # Test a table that contains different data types - p = table_itype({ - True: 42, - broker.Port(22, broker.Port.TCP): False, - (1,2,3): [4,5,6], - broker.Count(13): "test", - }) + p = table_itype( + { + True: 42, + broker.Port(22, broker.Port.TCP): False, + (1, 2, 3): [4, 5, 6], + broker.Count(13): "test", + } + ) d = self.check_to_broker( - p, '{T -> 42, 13 -> test, 22/tcp -> F, (1, 2, 3) -> (4, 5, 6)}', - broker.Data.Type.Table) + p, + "{T -> 42, 13 -> test, 22/tcp -> F, (1, 2, 3) -> (4, 5, 6)}", + broker.Data.Type.Table, + ) t = d.as_table() self.check_to_broker(t[broker.Data(True)], "42", broker.Data.Type.Integer) self.check_to_py(t[broker.Data(True)], 42) - self.check_to_broker(t[broker.Data(broker.Port(22, broker.Port.TCP))], "F", broker.Data.Type.Boolean) + self.check_to_broker( + t[broker.Data(broker.Port(22, broker.Port.TCP))], + "F", + broker.Data.Type.Boolean, + ) self.check_to_py(t[broker.Data(broker.Port(22, broker.Port.TCP))], False) - self.check_to_broker(t[broker.Data((1, 2, 3))], "(4, 5, 6)", broker.Data.Type.Vector) + self.check_to_broker( + t[broker.Data((1, 2, 3))], "(4, 5, 6)", broker.Data.Type.Vector + ) self.check_to_py(t[broker.Data([1, 2, 3])], (4, 5, 6)) - self.check_to_broker(t[broker.Data(broker.Count(13))], "test", broker.Data.Type.String) + self.check_to_broker( + t[broker.Data(broker.Count(13))], "test", broker.Data.Type.String + ) self.check_to_py(t[broker.Data(broker.Count(13))], "test") def test_dict(self): @@ -333,33 +375,34 @@ def test_mapping_proxy_type(self): def test_vector(self): # Test an empty vector - self.check_to_broker_and_back((), '()', broker.Data.Type.Vector) + self.check_to_broker_and_back((), "()", broker.Data.Type.Vector) # Test a simple vector - d = self.check_to_broker((1, 2, 3), '(1, 2, 3)', broker.Data.Type.Vector) + d = self.check_to_broker((1, 2, 3), "(1, 2, 3)", broker.Data.Type.Vector) # Either a list or a tuple are mapped to a Broker vector. - d = self.check_to_broker([1, 2, 3], '(1, 2, 3)', broker.Data.Type.Vector) + d = self.check_to_broker([1, 2, 3], "(1, 2, 3)", broker.Data.Type.Vector) - for (i, x) in enumerate(d.as_vector()): + for i, x in enumerate(d.as_vector()): self.check_to_broker(x, str(i + 1), broker.Data.Type.Integer) self.check_to_py(x, i + 1) # Test a vector that contains various data types - d = broker.Data(['foo', [[True]], ipaddress.IPv6Address('::1'), None]) + d = broker.Data(["foo", [[True]], ipaddress.IPv6Address("::1"), None]) v = d.as_vector() - self.check_to_broker(v[0], 'foo', broker.Data.Type.String) - self.check_to_py(v[0], 'foo') + self.check_to_broker(v[0], "foo", broker.Data.Type.String) + self.check_to_py(v[0], "foo") v1 = v[1].as_vector() - self.check_to_broker(v1, '((T))', broker.Data.Type.Vector) + self.check_to_broker(v1, "((T))", broker.Data.Type.Vector) self.check_to_py(v[1], ((True,),)) - self.check_to_broker(v1[0], '(T)', broker.Data.Type.Vector) + self.check_to_broker(v1[0], "(T)", broker.Data.Type.Vector) self.check_to_py(v1[0], (True,)) - self.check_to_broker(v[2], '::1', broker.Data.Type.Address) + self.check_to_broker(v[2], "::1", broker.Data.Type.Address) self.check_to_py(v[2], ipaddress.IPv6Address("::1")) - self.check_to_broker(v[3], 'nil', broker.Data.Type.Nil) + self.check_to_broker(v[3], "nil", broker.Data.Type.Nil) + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/forwarding.py b/tests/python/forwarding.py index cfc667b06..98ebc0a94 100644 --- a/tests/python/forwarding.py +++ b/tests/python/forwarding.py @@ -1,11 +1,8 @@ - import unittest -import multiprocessing -import sys -import time import broker + def cleanup(es, ss): for s in ss: if s: @@ -14,9 +11,24 @@ def cleanup(es, ss): for e in es: e.shutdown() -def setup_peers(opts1=None, opts2=None, opts3=None, opts4=None, create_s1=True, create_s2=True, create_s3=True, create_s4=True): + +def setup_peers( + opts1=None, + opts2=None, + opts3=None, + opts4=None, + create_s1=True, + create_s2=True, + create_s3=True, + create_s4=True, +): def cfg(opts): - return broker.Configuration(opts) if opts else broker.Configuration(broker.BrokerOptions()) + return ( + broker.Configuration(opts) + if opts + else broker.Configuration(broker.BrokerOptions()) + ) + ep1 = broker.Endpoint(cfg(opts1)) ep2 = broker.Endpoint(cfg(opts2)) ep3 = broker.Endpoint(cfg(opts3)) @@ -38,6 +50,7 @@ def cfg(opts): return ((ep1, ep2, ep3, ep4), (s1, s2, s3, s4)) + class TestCommunication(unittest.TestCase): def test_two_subscribed_hops(self): # Two hops that are subscribed. @@ -50,11 +63,12 @@ def test_two_subscribed_hops(self): ep4.publish("/test/bar", "Bar!") x = s4.get() - self.assertEqual(x, ('/test/foo', 'Foo!')) + self.assertEqual(x, ("/test/foo", "Foo!")) x = s1.get() - self.assertEqual(x, ('/test/bar', 'Bar!')) + self.assertEqual(x, ("/test/bar", "Bar!")) cleanup((ep1, ep2, ep3, ep4), (s1, s2, s3, s4)) + #### Note: disabled until we switch back to source-routing. # # def test_two_unsubscribed_hops(self): @@ -74,6 +88,6 @@ def test_two_subscribed_hops(self): # self.assertEqual(x, ('/test/bar', 'Bar!')) # cleanup((ep1, ep2, ep3, ep4), (s1, s2, s3, s4)) -if __name__ == '__main__': - #TestCommunication().test_two_hops() +if __name__ == "__main__": + # TestCommunication().test_two_hops() unittest.main(verbosity=3) diff --git a/tests/python/ssl-tests.py b/tests/python/ssl-tests.py index fe98ebad3..96ccb687d 100644 --- a/tests/python/ssl-tests.py +++ b/tests/python/ssl-tests.py @@ -1,15 +1,14 @@ -import unittest -import multiprocessing -import sys -import time import os.path +import unittest import broker + def data_path(file): base = os.path.realpath(__file__) return os.path.join(os.path.join(os.path.dirname(base), "certs"), file) + class TestSSL(unittest.TestCase): def check_ping(self, ep1, s1, ep2, s2): ep2.publish("/test", ["ping"]) @@ -28,11 +27,12 @@ def test_ssl_auth_success_ca(self): cfg.openssl_key = data_path("key.1.pem") cfg.openssl_cafile = data_path("ca.pem") - with broker.Endpoint(cfg) as ep1, \ - broker.Endpoint(cfg) as ep2, \ - ep1.make_subscriber("/test") as s1, \ - ep2.make_subscriber("/test") as s2: - + with ( + broker.Endpoint(cfg) as ep1, + broker.Endpoint(cfg) as ep2, + ep1.make_subscriber("/test") as s1, + ep2.make_subscriber("/test") as s2, + ): port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, True) @@ -46,11 +46,12 @@ def test_ssl_auth_success_ca_pw(self): cfg.openssl_cafile = data_path("ca.pem") cfg.openssl_passphrase = "12345" - with broker.Endpoint(cfg) as ep1, \ - broker.Endpoint(cfg) as ep2, \ - ep1.make_subscriber("/test") as s1, \ - ep2.make_subscriber("/test") as s2: - + with ( + broker.Endpoint(cfg) as ep1, + broker.Endpoint(cfg) as ep2, + ep1.make_subscriber("/test") as s1, + ep2.make_subscriber("/test") as s2, + ): port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, True) @@ -63,11 +64,12 @@ def test_ssl_auth_success_self_signed(self): cfg.openssl_key = data_path("key.self-signed.pem") cfg.openssl_cafile = data_path("cert.self-signed.pem") - with broker.Endpoint(cfg) as ep1, \ - broker.Endpoint(cfg) as ep2, \ - ep1.make_subscriber("/test") as s1, \ - ep2.make_subscriber("/test") as s2: - + with ( + broker.Endpoint(cfg) as ep1, + broker.Endpoint(cfg) as ep2, + ep1.make_subscriber("/test") as s1, + ep2.make_subscriber("/test") as s2, + ): port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, True) @@ -85,21 +87,16 @@ def test_ssl_auth_failure_self_signed(self): cfg2.openssl_key = data_path("key.self-signed.pem") cfg2.openssl_cafile = data_path("cert.self-signed.pem") - with broker.Endpoint(cfg1) as ep1, \ - broker.Endpoint(cfg2) as ep2: - + with broker.Endpoint(cfg1) as ep1, broker.Endpoint(cfg2) as ep2: port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) - with broker.Endpoint(cfg2) as ep1, \ - broker.Endpoint(cfg1) as ep2: - + with broker.Endpoint(cfg2) as ep1, broker.Endpoint(cfg1) as ep2: port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) - def test_ssl_auth_failure_no_auth(self): cfg1 = broker.Configuration(broker.BrokerOptions()) cfg1.openssl_certificate = data_path("cert.1.pem") @@ -108,16 +105,12 @@ def test_ssl_auth_failure_no_auth(self): cfg2 = broker.Configuration(broker.BrokerOptions()) - with broker.Endpoint(cfg1) as ep1, \ - broker.Endpoint(cfg2) as ep2: - + with broker.Endpoint(cfg1) as ep1, broker.Endpoint(cfg2) as ep2: port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) - with broker.Endpoint(cfg2) as ep1, \ - broker.Endpoint(cfg1) as ep2: - + with broker.Endpoint(cfg2) as ep1, broker.Endpoint(cfg1) as ep2: port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) @@ -130,16 +123,12 @@ def test_ssl_auth_failure_no_ssl(self): cfg2 = broker.Configuration(broker.BrokerOptions()) - with broker.Endpoint(cfg1) as ep1, \ - broker.Endpoint(cfg2) as ep2: - + with broker.Endpoint(cfg1) as ep1, broker.Endpoint(cfg2) as ep2: port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) - with broker.Endpoint(cfg2) as ep1, \ - broker.Endpoint(cfg1) as ep2: - + with broker.Endpoint(cfg2) as ep1, broker.Endpoint(cfg1) as ep2: port = ep1.listen("127.0.0.1", 0) r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) @@ -151,9 +140,7 @@ def XXXtest_ssl_auth_failure_ca_pw(self): cfg.openssl_cafile = data_path("ca.pem") cfg.openssl_passphrase = "WRONG PASSWORD" - with broker.Endpoint(cfg) as ep1, \ - broker.Endpoint(cfg) as ep2: - + with broker.Endpoint(cfg) as ep1, broker.Endpoint(cfg) as ep2: port = ep1.listen("127.0.0.1", 0) # TODO: This correctly generates an exception in CAF, for which I @@ -161,5 +148,6 @@ def XXXtest_ssl_auth_failure_ca_pw(self): r = ep2.peer("127.0.0.1", port, 0) self.assertEqual(r, False) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/store.py b/tests/python/store.py index ee4204498..98dddcaf6 100644 --- a/tests/python/store.py +++ b/tests/python/store.py @@ -1,23 +1,21 @@ - -import unittest -import sys import time +import unittest import broker -from inspect import currentframe, getframeinfo def create_stores(self): ep0 = broker.Endpoint() ep1 = broker.Endpoint() ep2 = broker.Endpoint() - with ep0.make_subscriber("/test") as s0, \ - ep1.make_subscriber("/test") as s1, \ - ep1.make_status_subscriber() as es1, \ - ep2.make_subscriber("/test") as s2, \ - ep2.make_status_subscriber() as es2: - + with ( + ep0.make_subscriber("/test") as s0, # noqa: F841 + ep1.make_subscriber("/test") as s1, + ep1.make_status_subscriber() as es1, # noqa: F841 + ep2.make_subscriber("/test") as s2, + ep2.make_status_subscriber() as es2, # noqa: F841 + ): p = ep0.listen("127.0.0.1", 0) ep1.peer("127.0.0.1", p) ep2.peer("127.0.0.1", p) @@ -36,16 +34,18 @@ def create_stores(self): return (ep0, ep1, ep2, m, c1, c2) + # Runs a test with one master and two clones # --tri-setup-start def run_tri_setup(self, f): - with broker.Endpoint() as ep0, \ - broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep0.attach_master("test", broker.Backend.Memory) as m, \ - ep1.attach_clone("test") as c1, \ - ep2.attach_clone("test") as c2: - + with ( + broker.Endpoint() as ep0, + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep0.attach_master("test", broker.Backend.Memory) as m, + ep1.attach_clone("test") as c1, + ep2.attach_clone("test") as c2, + ): # connect the nodes port = ep0.listen("127.0.0.1", 0) self.assertTrue(ep1.peer("127.0.0.1", port)) @@ -62,18 +62,23 @@ def run_tri_setup(self, f): self.assertTrue(c2.await_idle()) f(m, c1, c2) + + # --tri-setup-end + def await_idle(self, *argv): for store in argv: - self.assertTrue(store.await_idle()); + self.assertTrue(store.await_idle()) + class TestStore(unittest.TestCase): def test_basic(self): # --master-start - with broker.Endpoint() as ep1, \ - ep1.attach_master("test", broker.Backend.Memory) as m: - + with ( + broker.Endpoint() as ep1, + ep1.attach_master("test", broker.Backend.Memory) as m, + ): m.put("key", "value") x = m.get("key") # x == "value" @@ -95,7 +100,7 @@ def impl(m, c1, c2): self.assertEqual(c2.put_unique("e", "first"), True) self.assertEqual(c2.put_unique("e", "second"), False) self.assertEqual(c2.put_unique("e", "third"), False) - time.sleep(.5) + time.sleep(0.5) def checkAccessors(x): self.assertEqual(x.get("a"), v1) @@ -112,7 +117,7 @@ def checkAccessors(x): self.assertEqual(x.get_index_from_value("c", 10), None) self.assertEqual(x.get_index_from_value("d", 1), "B") self.assertEqual(x.get_index_from_value("d", 10), None) - self.assertEqual(x.keys(), {'a', 'b', 'c', 'd', 'e'}) + self.assertEqual(x.keys(), {"a", "b", "c", "d", "e"}) checkAccessors(m) checkAccessors(c1) @@ -125,7 +130,7 @@ def checkAccessors(x): m.put("h", v5, 2) m.put("str", "b") m.put("vec", (1, 2)) - m.put("set", set([1, 2])) + m.put("set", {1, 2}) m.put("table", {1: "A", "2": "C"}) # --ops-start @@ -141,17 +146,17 @@ def checkAccessors(x): m.pop("vec") # --ops-end - time.sleep(0.15) # Make sure 'g' expires. + time.sleep(0.15) # Make sure 'g' expires. await_idle(self, c2, c1, m) def checkModifiers(x): self.assertEqual(x.get("e"), v5 + 1) self.assertEqual(x.get("f"), v5 - 1) - self.assertEqual(x.get("g"), None) # Expired - self.assertEqual(x.get("h"), v5) # Not Expired + self.assertEqual(x.get("g"), None) # Expired + self.assertEqual(x.get("h"), v5) # Not Expired self.assertEqual(x.get("str"), "bar") - self.assertEqual(x.get("set"), set([2, 3])) + self.assertEqual(x.get("set"), {2, 3}) self.assertEqual(x.get("table"), {3: "D", "2": "C"}) self.assertEqual(x.get("vec"), (1, 2, 3)) @@ -170,11 +175,12 @@ def checkModifiers(x): run_tri_setup(self, impl) def test_from_one_clone(self): - with broker.Endpoint() as ep0, \ - broker.Endpoint() as ep1, \ - ep0.attach_master("test", broker.Backend.Memory) as m, \ - ep1.attach_clone("test") as c1: - + with ( + broker.Endpoint() as ep0, + broker.Endpoint() as ep1, + ep0.attach_master("test", broker.Backend.Memory) as m, + ep1.attach_clone("test") as c1, + ): port = ep0.listen("127.0.0.1", 0) self.assertTrue(ep1.peer("127.0.0.1", port)) @@ -184,8 +190,10 @@ def test_from_one_clone(self): v1 = "A" v2 = {"A", "B", "C"} - v3 = {1: "A", 2: "B", 3: "C"} - v4 = ("A", "B", "C") + + # TODO: These are untested, should they be? + # v3 = {1: "A", 2: "B", 3: "C"} + # v4 = ("A", "B", "C") c1.put("a", v1) c1.put("b", v2) @@ -194,10 +202,8 @@ def test_from_one_clone(self): self.assertEqual(c1.put_unique("e", "first"), True) - def test_from_two_clones(self): def impl(m, c1, c2): - v1 = "A" v2 = {"A", "B", "C"} v3 = {1: "A", 2: "B", 3: "C"} @@ -228,7 +234,7 @@ def checkAccessors(x): self.assertEqual(x.get_index_from_value("c", 10), None) self.assertEqual(x.get_index_from_value("d", 1), "B") self.assertEqual(x.get_index_from_value("d", 10), None) - self.assertEqual(x.keys(), {'a', 'b', 'c', 'd', 'e'}) + self.assertEqual(x.keys(), {"a", "b", "c", "d", "e"}) checkAccessors(m) checkAccessors(c1) @@ -241,7 +247,7 @@ def checkAccessors(x): c2.put("h", v5, 20) m.put("str", "b") m.put("vec", [1, 2]) - m.put("set", set([1, 2])) + m.put("set", {1, 2}) m.put("table", {1: "A", "2": "C"}) await_idle(self, c1, c2, m) @@ -265,10 +271,10 @@ def checkAccessors(x): def checkModifiers(x): self.assertEqual(x.get("e"), v5 + 1) self.assertEqual(x.get("f"), v5 - 1) - self.assertEqual(x.get("g"), None) # Expired - self.assertEqual(x.get("h"), v5) # Not Expired + self.assertEqual(x.get("g"), None) # Expired + self.assertEqual(x.get("h"), v5) # Not Expired self.assertEqual(x.get("str"), "bar") - self.assertEqual(x.get("set"), set([2, 3])) + self.assertEqual(x.get("set"), {2, 3}) self.assertEqual(x.get("table"), {3: "D", "2": "C"}) self.assertEqual(x.get("vec"), (1, 2, 3)) @@ -286,5 +292,6 @@ def checkModifiers(x): run_tri_setup(self, impl) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/topic.py b/tests/python/topic.py index f408a77a5..92470c565 100644 --- a/tests/python/topic.py +++ b/tests/python/topic.py @@ -1,8 +1,8 @@ - import unittest import broker + class TestTopic(unittest.TestCase): def test_string(self): t = broker.Topic("/a/b/c") @@ -18,5 +18,6 @@ def test_append(self): self.assertEqual(t3.string(), "/a//b/c") self.assertEqual(t2.string(), "/b/c/a") -if __name__ == '__main__': - unittest.main(verbosity=3) + +if __name__ == "__main__": + unittest.main(verbosity=3) diff --git a/tests/python/zeek-module.py b/tests/python/zeek-module.py index 106b28a79..d06e0fab4 100644 --- a/tests/python/zeek-module.py +++ b/tests/python/zeek-module.py @@ -1,6 +1,7 @@ """ Test the broker.zeek module without involving Zeek because speed. """ + import datetime import unittest @@ -9,8 +10,8 @@ NetworkTimestamp = broker.zeek.MetadataType.NetworkTimestamp -class TestEventMetadata(unittest.TestCase): +class TestEventMetadata(unittest.TestCase): dt = datetime.datetime(2023, 5, 3, 9, 27, 57, tzinfo=broker.utc) def test_event_no_metadata(self): @@ -40,15 +41,16 @@ def test_event_metadata_bad_timestamp_2(self): e = broker.zeek.Event("a", [42, "a"], metadata=[(NetworkTimestamp, 1.234)]) self.assertFalse(e.valid()) -class TestCommunication(unittest.TestCase): +class TestCommunication(unittest.TestCase): dt = datetime.datetime(2023, 5, 3, 9, 27, 57, tzinfo=broker.utc) def test_event_metadata(self): - with broker.Endpoint() as ep1, \ - broker.Endpoint() as ep2, \ - ep1.make_subscriber("/test") as s1: - + with ( + broker.Endpoint() as ep1, + broker.Endpoint() as ep2, + ep1.make_subscriber("/test") as s1, + ): port = ep1.listen("127.0.0.1", 0) self.assertTrue(ep2.peer("127.0.0.1", port, 1.0)) @@ -73,5 +75,5 @@ def test_event_metadata(self): self.assertEqual(metadata_dict[broker.Count(1234)], "custom") -if __name__ == '__main__': +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/zeek-unsafe-types.py b/tests/python/zeek-unsafe-types.py index 1e718128b..551d6c5bd 100644 --- a/tests/python/zeek-unsafe-types.py +++ b/tests/python/zeek-unsafe-types.py @@ -1,10 +1,8 @@ -from __future__ import print_function -import unittest import multiprocessing +import unittest import broker - -from zeek_common import run_zeek_path, run_zeek +from zeek_common import run_zeek ZeekHello = """ redef Broker::default_connect_retry=1secs; @@ -41,11 +39,10 @@ } """ + class TestCommunication(unittest.TestCase): def test_regular(self): - with broker.Endpoint() as ep, \ - ep.make_subscriber("/test") as sub: - + with broker.Endpoint() as ep, ep.make_subscriber("/test") as sub: port = ep.listen("127.0.0.1", 0) p = multiprocessing.Process(target=run_zeek, args=(ZeekHello, port)) @@ -60,9 +57,7 @@ def test_regular(self): self.assertEqual(str(ctx.exception), "unhashable type: 'dict'") def test_safe(self): - with broker.Endpoint() as ep, \ - ep.make_safe_subscriber("/test") as sub: - + with broker.Endpoint() as ep, ep.make_safe_subscriber("/test") as sub: port = ep.listen("127.0.0.1", 0) p = multiprocessing.Process(target=run_zeek, args=(ZeekHello, port)) @@ -84,7 +79,10 @@ def test_safe(self): # broker.zeek.SafeEvent uses broker.ImmutableData, so can access # the arguments safely: ev = broker.zeek.SafeEvent(msg) - args = ev.args() -if __name__ == '__main__': + # TODO: should this test inspect the args somehow? + args = ev.args() # noqa: F841 + + +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/zeek.py b/tests/python/zeek.py index c5942c4bd..5480235e2 100644 --- a/tests/python/zeek.py +++ b/tests/python/zeek.py @@ -1,11 +1,9 @@ -from __future__ import print_function -import unittest import multiprocessing +import unittest +from datetime import datetime import broker - -from zeek_common import run_zeek_path, run_zeek -from datetime import datetime +from zeek_common import run_zeek ZeekPing = """ redef Broker::default_connect_retry=1secs; @@ -56,11 +54,10 @@ } """ + class TestCommunication(unittest.TestCase): def test_ping(self): - with broker.Endpoint() as ep, \ - ep.make_subscriber("/test") as sub: - + with broker.Endpoint() as ep, ep.make_subscriber("/test") as sub: port = ep.listen("127.0.0.1", 0) p = multiprocessing.Process(target=run_zeek, args=(ZeekPing, port)) @@ -74,7 +71,7 @@ def test_ping(self): expected_arg = "x" + "Xx" * i if i == 5: - expected_arg = expected_arg.encode('utf-8') + b'\x82' + expected_arg = expected_arg.encode("utf-8") + b"\x82" # Extract metadata. ev_metadata = ev.metadata() @@ -96,11 +93,14 @@ def test_ping(self): if i < 3: ev = broker.zeek.Event("pong", s + "X", c, metadata=metadata) elif i < 5: - ev = broker.zeek.Event("pong", s.encode('utf-8') + b'X', c, metadata=metadata) + ev = broker.zeek.Event( + "pong", s.encode("utf-8") + b"X", c, metadata=metadata + ) else: - ev = broker.zeek.Event("pong", 'done', c) + ev = broker.zeek.Event("pong", "done", c) ep.publish("/test", ev) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main(verbosity=3) diff --git a/tests/python/zeek_common.py b/tests/python/zeek_common.py index 0e0b14900..40afdd313 100644 --- a/tests/python/zeek_common.py +++ b/tests/python/zeek_common.py @@ -2,6 +2,7 @@ import subprocess import tempfile + def run_zeek_path(): base = os.path.realpath(__file__) for d in (os.path.join(os.path.dirname(base), "../../build"), os.getcwd()): @@ -9,7 +10,8 @@ def run_zeek_path(): if os.path.exists(run_zeek_script): return run_zeek_script - return "zeek" # Hope for the best ... + return "zeek" # Hope for the best ... + def run_zeek(script, port): try: