From aac7cb3561da2dd0b595d3516a1dada21a49fe4a Mon Sep 17 00:00:00 2001 From: Michal Novak Date: Tue, 12 Dec 2023 14:31:25 +0100 Subject: [PATCH 1/5] docs update minor refactoring stream telemetry delete in demo adapter tests for delete in stream telemetry Signed-off-by: Michal Novak --- data/demo.xml | 10 +- docs/ConfD_gNMI_adapter.adoc | 53 +++--- gnmi-tools.xml | 20 +++ gnmi-tools.yang | 8 + src/confd_gnmi_adapter.py | 14 +- src/confd_gnmi_api_adapter.py | 3 - src/confd_gnmi_client.py | 34 ++-- src/confd_gnmi_demo_adapter.py | 284 ++++++++++++++++++++---------- tests/client_server_test_base.py | 271 +++++++++++++++++++--------- tests/test_client_server_confd.py | 6 +- tests/test_client_server_demo.py | 2 + 11 files changed, 476 insertions(+), 229 deletions(-) diff --git a/data/demo.xml b/data/demo.xml index 93fd39c..9e356df 100644 --- a/data/demo.xml +++ b/data/demo.xml @@ -3,6 +3,10 @@ + + /gnmi-tools/top-d + + /interfaces/interface[name=if_5]/type fastEther @@ -37,8 +41,12 @@ /interfaces-state/interface[name=state_if_6]/type gigabitEthernet + + /interfaces/interface[name=if_6]/type + + send - \ No newline at end of file + diff --git a/docs/ConfD_gNMI_adapter.adoc b/docs/ConfD_gNMI_adapter.adoc index a8a2325..ba9ce61 100644 --- a/docs/ConfD_gNMI_adapter.adoc +++ b/docs/ConfD_gNMI_adapter.adoc @@ -23,8 +23,8 @@ endif::[] :Author: Michal Novák :email: micnovak@cisco.com :URL: https://www.tail-f.com/ -:Date: 2023-01-24 -:Revision: 0.3.0 +:Date: 2023-12-12 +:Revision: 0.4.0 == Version history @@ -32,8 +32,9 @@ endif::[] |====== | Document version | Notes | Date | Author | 0.0.1 | Initial document version. | 2021-02-09 | {author} {email} -| 0.1.0 | Run options updated, added seq. diagrams for Subscribe operation. | 2021-03-23 | {author} {email} +| 0.1.0 | Run options updated, added sequence diagrams for `Subscribe` operation. | 2021-03-23 | {author} {email} | 0.2.0 | External data provider description, fixes, documentation update. | 2021-04-27 | {author} {email} +| 0.3.0 | Update command line options, added Encoding description | 2023-01-24 | {author} {email} | {revision} | Update command line options, added Encoding description | {date} | {author} {email} |====== @@ -41,7 +42,7 @@ toc::[] == Introduction -https://www.tail-f.com/management-agent/[ConfD] is configuration management agent supporting various standard and proprietary northbound interfaces like: +https://www.tail-f.com/management-agent/[ConfD] is a configuration management agent supporting various standard and proprietary northbound interfaces like: * https://tools.ietf.org/html/rfc6241[NETCONF] * https://tools.ietf.org/html/rfc8040[RESTCONF] @@ -51,17 +52,17 @@ https://www.tail-f.com/management-agent/[ConfD] is configuration management agen * CLI https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md[gNMI] is another popular north bound interface, which is not implemented by ConfD. -In this demo project we will implement gNMI Adapter over existing ConfD interfaces to make (at least partial) gNMI support. +In this project, we will implement gNMI Adapter over existing ConfD interfaces to make (at least partial) gNMI support. In the beginning, we will provide basic functionality, for most common operations, later on we will add more. -This demo focuses on functionality and simplicity, we have chosen Python as implementation language. +The project focuses on functionality and simplicity, we have chosen Python as implementation language. -We use general approach, so the demo can be adapted for other management agents and tools as well. +We use the general approach, so the project can be adapted for other management agents and tools as well. -This demo is still work in progress, see <> section. +This project is still work in progress, see <> section. === Copy/Paste and Output blocks -In this note you can find script and code examples, that can be directly pasted into the shell or CLI terminal. We will use following block style for the copy/paste ready text: +In this note, you can find script and code examples that can be directly pasted into the shell or CLI terminal. We will use the following block style for the copy/paste ready text: [source,shell,role="acopy"] ---- @@ -69,11 +70,11 @@ pip install grpcio-tools ---- NOTE: make sure all commands have executed - confirm last command with kbd:[ENTER], if needed. -If viewed on https://github.com[GitHub], you may find following +If viewed on https://github.com[GitHub], you may find the following browser https://github.com/zenorocha/codecopy[extension] useful (out-of-the-box *copy to clipboard* button). The output of the shell CLI commands or file content will be displayed -with following block style: +with the following block style: .[.small]_output_ [.output] @@ -104,7 +105,7 @@ Options: We expect https://www.python.org/[Python3] to be installed. The `python` and `pip` commands are from Python3 environment. If not, use `python3` or `pip3` instead (or use e.g. `sudo apt-get install python-is-python3`) -TIP: For package installation and development, you may consider creating https://docs.python.org/3/tutorial/venv.html[python virtual environment]. +TIP: For package installation and development, you may consider creating a https://docs.python.org/3/tutorial/venv.html[python virtual environment]. === gRPC Python tools @@ -126,7 +127,7 @@ pip install --upgrade grpcio-tools === Pytest -For automated tests we will use https://www.pytest.org/[pytest] framework. +For automated tests, we will use a https://www.pytest.org/[pytest] framework. If you want to run tests, use `pip` to install it. .Installation @@ -141,7 +142,7 @@ pip install pytest pip install --upgrade pytest ---- -NOTE: `pytest` may be available also as package in your distribution (e.g. `apt-get install python3-pytest`). We still recommend to use `pip` to get the latest version. +NOTE: `pytest` is often available also as package in your distribution (e.g. `apt-get install python3-pytest`). We still recommend to use `pip` to get the latest version. === ConfD @@ -153,7 +154,7 @@ Install https://www.tail-f.com/management-agent/[ConfD Premium] or https://www.t source ${CONFD_DIR}/confdrc ---- -TIP: See https://info.tail-f.com/confd-evaluation-kick-start-guide[ConfD Kick Start Guide] for additional information. +TIP: See https://info.tail-f.com/confd-evaluation-kick-start-guide[ConfD Kick-Start Guide] for additional information. === Build environment @@ -182,7 +183,7 @@ service gNMI { NOTE: The interface itself looks relatively simple, but the `Request` and `Response` messages may be complex. `Subscribe` method has many variants. More details can be found in the https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification[gNMI Specification]. -== Building gNMI Adapter demo +== Building the gNMI Adapter === gNMI python binding @@ -206,7 +207,7 @@ https://tools.ietf.org/html/rfc8343[`ietf-interfaces.yang`] and its dependencies NOTE: The used datamodel and initial configuration is used for demonstration in this note. The gNMI Adapter can run against any other ConfD instance with different data model. In this case, paths and values will be different. See examples with ConfD example application <> and <>. -== Running gNMI Adapter demo +== Running the gNMI Adapter Before running the adapter, we need to make sure gNMI python binding is created. @@ -216,11 +217,11 @@ Before running the adapter, we need to make sure gNMI python binding is created. make clean all ---- -The adapter can be run in _demo_ and _api_ mode. +The adapter can be run in _demo_, _confd_ and _nso_ mode. -In the _demo_ mode it does not require running ConfD, it partly emulates `ietf-interfaces.yang` data model and initial configuration. This mode is useful for testing, development, etc. +In the _demo_ mode it does not require running ConfD or NSO, it partly emulates `ietf-interfaces.yang` data model and initial configuration. This mode is useful for testing, development, etc. -In case we want to run adapter against ConfD (_api_ mode), we can use `Makefile` `start` target to start ConfD with initial demo configuration. +In case we want to test adapter against ConfD (_confd_ mode), we can use `Makefile` `start` target to start ConfD with initial configuration. .rebuild everything and start ConfD and load demo configuration [source, shell, role="acopy"] @@ -450,6 +451,14 @@ For operational data: + `confd_cmd -o -fr -c "set /interfaces-state/interface{state_if_8}/type gigabitEthernet"` +NOTE: You can also test `STREAM` subscription in demo mode by passing configuration file. + +Start the server: + +`./src/confd_gnmi_server.py -t demo --logging=info --cfg=./data/demo.xml` + + + +And check with the client: + +`./src/confd_gnmi_client.py -o subscribe -s STREAM --read-count=10 --prefix /ietf-interfaces:interfaces --path interface[name=if_5]/name --path interface[name=if_5]/type` + + .subscribe for `list` entry [source, shell, role="acopy"] ---- @@ -1054,7 +1063,7 @@ TIP: To list-only tests, use `./test.sh --collect-only -q tests/` === Limitations and TODOs -The implementation of the adapter (still in early phase) is demo reference implementation that shows how to add gNMI support to existing ConfD interfaces. +The implementation of the adapter is a reference implementation that shows how to add gNMI support to existing ConfD and NSO interfaces. It can be extended according to deployment requirements. This not all gNMI functionality are currently supported. They may be added in the future. @@ -1094,7 +1103,7 @@ aggregation should not be used for Subscriptions by default) == Conclusion -gNMI Adapter Demo can provide initial gNMI functionality to ConfD. +gNMI Adapter can provide initial gNMI functionality to ConfD and NSO. == References diff --git a/gnmi-tools.xml b/gnmi-tools.xml index f6b3f80..ce37a14 100644 --- a/gnmi-tools.xml +++ b/gnmi-tools.xml @@ -54,5 +54,25 @@ opticalTransport In-Service + + + n1 + + + n2 + + + + n3 + + + + n4 + + test + 23 + + + diff --git a/gnmi-tools.yang b/gnmi-tools.yang index f02e3a1..2c8eeb2 100644 --- a/gnmi-tools.yang +++ b/gnmi-tools.yang @@ -39,6 +39,14 @@ module gnmi-tools { leaf type {type string;} leaf admin-state {type string;} } + + container top-d { + list top-d-list { + key name; + leaf name {type string;} + uses combo; + } + } } } diff --git a/src/confd_gnmi_adapter.py b/src/confd_gnmi_adapter.py index fd1b787..aa657cb 100644 --- a/src/confd_gnmi_adapter.py +++ b/src/confd_gnmi_adapter.py @@ -125,11 +125,10 @@ def add_path_for_monitoring(self, path, prefix): pass @abstractmethod - def get_monitored_changes(self) -> List: + def get_subscription_notifications(self): """ Get gNMI subscription updates for changed values - :return: gNMI update array - #TODO should we also return delete array + :return: gNMI Notification array """ pass @@ -261,15 +260,6 @@ def changes(self): log.debug("<== responses=%s", responses) return responses - def get_subscription_notifications(self): - update = self.get_monitored_changes() - notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), - prefix=self.subscription_list.prefix, - update=update, - delete=[], - atomic=False) - return [notif] - def _get_next_sample_interval_and_subscriptions(self, first_sample_time: int): interval = None diff --git a/src/confd_gnmi_api_adapter.py b/src/confd_gnmi_api_adapter.py index b3b9a7b..2526aec 100644 --- a/src/confd_gnmi_api_adapter.py +++ b/src/confd_gnmi_api_adapter.py @@ -146,9 +146,6 @@ def _get_subscription_notifications(self): yield prefix, updates, deletes self.change_db = [] - def get_monitored_changes(self): - raise NotImplementedError - def get_sample(self, path, prefix, allow_aggregation=False, start_change_processing=False): log.debug("==>") diff --git a/src/confd_gnmi_client.py b/src/confd_gnmi_client.py index aea6f16..85f5740 100755 --- a/src/confd_gnmi_client.py +++ b/src/confd_gnmi_client.py @@ -18,7 +18,7 @@ from confd_gnmi_common import HOST, PORT, make_xpath_path, VERSION, \ common_optparse_options, common_optparse_process, make_gnmi_path, \ datatype_str_to_int, subscription_mode_str_to_int, \ - encoding_int_to_str, encoding_str_to_int, get_time_string + encoding_int_to_str, encoding_str_to_int, get_time_string, get_timestamp_ns from gnmi_pb2_grpc import gNMIStub @@ -147,18 +147,21 @@ def print_notification(n): pfx_str = make_xpath_path(gnmi_prefix=n.prefix) print("timestamp {} prefix {} atomic {}".format( get_time_string(n.timestamp), pfx_str, n.atomic)) - print("Updates:") - for u in n.update: - if u.val.json_val: - value = json.loads(u.val.json_val) - elif u.val.json_ietf_val: - value = json.loads(u.val.json_ietf_val) - else: - value = str(u.val) - print("path: {} value {}".format(pfx_str + make_xpath_path(u.path), - value)) - for dpath in n.delete: - print("path deleted: {}".format(pfx_str + make_xpath_path(dpath))) + if n.update: + print("Updates:") + for u in n.update: + if u.val.json_val: + value = json.loads(u.val.json_val) + elif u.val.json_ietf_val: + value = json.loads(u.val.json_ietf_val) + else: + value = str(u.val) + print("path: {} value {}".format(pfx_str + make_xpath_path(u.path), + value)) + if n.delete: + print("Deletes:") + for dpath in n.delete: + print("path deleted: {}".format(pfx_str + make_xpath_path(dpath))) @staticmethod def read_subscribe_responses(responses, read_count=-1): @@ -169,7 +172,10 @@ def read_subscribe_responses(responses, read_count=-1): log.info("******* Subscription received response=%s read_count=%i", response, read_count) print("subscribe - response read_count={}".format(read_count)) - ConfDgNMIClient.print_notification(response.update) + if response.sync_response: + print(f"timestamp {get_time_string(get_timestamp_ns())} Sync_response") + else: + ConfDgNMIClient.print_notification(response.update) num_read += 1 if read_count > 0: read_count -= 1 diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index c4b22e6..0b1f7b4 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -5,6 +5,7 @@ import re import threading import xml.etree.ElementTree as ET +from abc import abstractmethod from enum import Enum from queue import Queue, Empty from random import randint @@ -15,9 +16,43 @@ log = logging.getLogger('confd_gnmi_demo_adapter') +# Types for streaming telemetry operations +class ChangeOp: + @abstractmethod + def __repr__(self): + return f'{self.__class__.__name__}()' + + @abstractmethod + def __str__(self): + return f'{self.__class__.__name__}()' + + +class ChangeVal(ChangeOp): + def __init__(self, value): + if not isinstance(value, str): + raise ValueError("Invalid value type") + self.value = value + + def __repr__(self): + return f'ChangeVal({self.value})' + + def __str__(self): + return str(self.value) + + +class ChangeDel(ChangeOp): + + def __init__(self, deleted_paths=[]): + self.deleted_paths = deleted_paths + def __repr__(self): + return f'ChangeDel()' + + def __str__(self): + return f'ChangeDel' class GnmiDemoServerAdapter(GnmiServerAdapter): NS_INTERFACES = "ietf-interfaces:" + NS_GNMI_TOOLS = "gnmi-tools:" NS_IANA = "iana-if-type:" # simple demo database @@ -47,7 +82,7 @@ class GnmiDemoServerAdapter(GnmiServerAdapter): ] def __init__(self): - self._fill_demo_db() + GnmiDemoServerAdapter.fill_demo_db() @staticmethod def load_config_string(xml_cfg): @@ -60,7 +95,7 @@ def load_config_string(xml_cfg): assert root.tag == "demo" changes = root.findall("./subscription/STREAM/changes/element") log.debug("changes=%s", changes) - if len(changes): + if changes: if "changes" not in GnmiDemoServerAdapter.config: GnmiDemoServerAdapter.config["changes"] = [] GnmiDemoServerAdapter.config["changes_idx"] = 0 @@ -68,12 +103,15 @@ def load_config_string(xml_cfg): log.debug("len(el)=%s", len(el)) if len(el): if len(el) == 2: - (path, val) = (el[0], el[1]) - log.debug("path.text=%s val.text=%s", path.text, - val.text) - GnmiDemoServerAdapter.config["changes"].append( - (GnmiDemoServerAdapter._nsless_xpath(path.text), - val.text.replace(GnmiDemoServerAdapter.NS_IANA, ""))) + path = GnmiDemoServerAdapter._nsless_xpath(el[0].text) + if (el[1].tag == "val"): + val = ChangeVal(el[1].text.replace(GnmiDemoServerAdapter.NS_IANA, "")) + elif (el[1].tag == "del"): + val=ChangeDel() + else: + assert False + log.debug("path=%s val=%s", path, val) + GnmiDemoServerAdapter.config["changes"].append((path, val)) elif len(el) == 0: log.debug("el.tag=%s", el.text) GnmiDemoServerAdapter.config["changes"].append(el.text) @@ -86,29 +124,49 @@ def get_adapter(cls): cls._instance = GnmiDemoServerAdapter() return cls._instance - def _fill_demo_db(self): + @classmethod + def fill_demo_db(cls, reset = False): log.debug("==>") - with self.db_lock: + + with cls.db_lock: + if reset: + cls.demo_db, cls.demo_state_db = {}, {} # make interfaces alphabetically sorted - ifs = sorted(str(i+1) for i in range(GnmiDemoServerAdapter.num_of_ifs)) + ifs = sorted( + str(i + 1) for i in range(cls.num_of_ifs)) for if_id in ifs: if_name = f"if_{if_id}" - state_if_name = "state_if_{}".format(if_id) - path = "/interfaces/interface[name={}]".format(if_name) - state_path = "/interfaces-state/interface[name={}]".format( - state_if_name) - self.demo_db["{}/name".format(path)] = if_name - self.demo_state_db["{}/name".format(state_path)] = state_if_name - self.demo_db["{}/type".format(path)] = "gigabitEthernet" - self.demo_state_db[ - "{}/type".format(state_path)] = "gigabitEthernet" - self.demo_db["{}/enabled".format(path)] = True - log.debug("<== self.demo_db=%s self.demo_state_db=%s", self.demo_db, - self.demo_state_db) + state_if_name = f"state_if_{if_id}" + path = f"/interfaces/interface[name={if_name}]" + state_path = f"/interfaces-state/interface[name={state_if_name}]" + cls.demo_db[f"{path}/name"] = if_name + cls.demo_state_db[f"{state_path}/name"] = state_if_name + cls.demo_db[f"{path}/type"] = "gigabitEthernet" + cls.demo_state_db[f"{state_path}/type"] = "gigabitEthernet" + cls.demo_db[f"{path}/enabled"] = True + + for i in range(1, 5): + path = f"/gnmi-tools/top-d/top-d-list[name=n{i}]" + cls.demo_db[f"{path}/name"] = f"n{i}" + if i == 2: + cls.demo_db[f"{path}/empty-leaf"] = [None] + if i == 3: + cls.demo_db[f"{path}/pres"] = {} + if i == 4: + cls.demo_db[f"{path}/down/str-leaf"] = "test" + cls.demo_db[f"{path}/down/int-leaf"] = "23" + + log.debug("<== self.demo_db=%s self.demo_state_db=%s", cls.demo_db, + cls.demo_state_db) + + @classmethod + def reset(cls): + cls.fill_demo_db(reset = True) @staticmethod def _nsless_xpath(xpath: str): - return xpath.replace(GnmiDemoServerAdapter.NS_INTERFACES, "") + return (xpath.replace(GnmiDemoServerAdapter.NS_INTERFACES, "") + .replace(GnmiDemoServerAdapter.NS_GNMI_TOOLS, "")) @staticmethod def _get_key_from_xpath(xpath): @@ -131,11 +189,9 @@ def _demo_db_to_key_elem_map(db): for p, v in db.items(): key = GnmiDemoServerAdapter._get_key_from_xpath(p) elem = GnmiDemoServerAdapter._get_elem_from_xpath(p) - elem_map = {} - if key in map_db: - elem_map = map_db[key] + elem_map = map_db.get(key, {}) if elem == "type": - v = "{}{}".format(GnmiDemoServerAdapter.NS_IANA, v) + v = f'{GnmiDemoServerAdapter.NS_IANA}{v}' elem_map[elem] = v map_db[key] = elem_map log.debug("<== map_db={}".format(map_db)) @@ -161,36 +217,59 @@ def get_sample(self, path, prefix, allow_aggregation=False) -> []: adapter: GnmiDemoServerAdapter = self.adapter with adapter.db_lock: updates = adapter.get_db_updates_for_path(path, prefix, - adapter.demo_db, + GnmiDemoServerAdapter.demo_db, allow_aggregation) # 'if' below is optimization if len(updates) == 0: updates = adapter.get_db_updates_for_path(path, prefix, - adapter.demo_state_db, + GnmiDemoServerAdapter.demo_state_db, allow_aggregation) log.debug("<== updates=%s", updates) return updates - def get_monitored_changes(self) -> []: + def get_subscription_notifications(self) -> []: + log.debug("==> self.change_db=%s", self.change_db) + assert len(self.change_db) > 0 + update = [] + delete = [] + + def remove_prefix(path, prefix): + if path.startswith(prefix): + return path[len(prefix):] + return path + with self.change_db_lock: - log.debug("==> self.change_db=%s", self.change_db) - assert len(self.change_db) > 0 - update = [] for c in self.change_db: prefix_str = self.adapter._nsless_xpath(make_xpath_path( gnmi_prefix=self.subscription_list.prefix)) - p = self.adapter._nsless_xpath(c[0]) - if p.startswith(prefix_str): - p = p[len(prefix_str):] - v = "{}{}".format(GnmiDemoServerAdapter.NS_IANA, c[1]) - json_val = gnmi_pb2.TypedValue( - json_ietf_val=json.dumps(v).encode()) - update.append(gnmi_pb2.Update(path=make_gnmi_path(p), - val=json_val)) + p = remove_prefix(self.adapter._nsless_xpath(c[0]), + prefix_str) + if isinstance(c[1], ChangeVal): + v = "{}{}".format(GnmiDemoServerAdapter.NS_IANA, c[1]) + json_val = gnmi_pb2.TypedValue( + json_ietf_val=json.dumps(v).encode()) + update.append(gnmi_pb2.Update(path=make_gnmi_path(p), + val=json_val)) + elif isinstance(c[1], ChangeDel): + delete.extend( + [make_gnmi_path(remove_prefix(p, prefix_str)) for p + in + [path for i, path in enumerate(c[1].deleted_paths) + if path not in c[1].deleted_paths[:i]]]) + else: + assert False self.change_db = [] - log.debug("<== update=%s", update) - return update + log.debug("<== update=%s delete=%s", update, delete) + + notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), + prefix=self.subscription_list.prefix, + update=update, + delete=delete, + atomic=False) + + log.debug("<== notif=%s", notif) + return [notif] def _get_random_changes(self): log.debug("==>") @@ -210,7 +289,7 @@ def _get_random_changes(self): new_val = "fastEther" log.debug("adding change path=%s, new_val=%s", path, new_val) - changes.append((path, new_val)) + changes.append((path, ChangeVal(new_val))) if randint(0, 9) < 8: changes.append("send") else: @@ -240,7 +319,7 @@ def _get_config_changes(): def process_changes(self): log.debug("==>") add_count = 0 - # there may be more changes to same path, we cannot use map + # there may be more changes to the same path, we cannot use map assert self.change_event_queue is not None assert len(self.change_db) == 0 while True: @@ -249,43 +328,8 @@ def process_changes(self): event = self.change_event_queue.get(timeout=1) log.debug("event=%s", event) if event == self.ChangeEvent.ADD: - # generate modifications and add them - with self.change_db_lock, self.adapter.db_lock: - if "changes" in GnmiDemoServerAdapter.config: - changes = self._get_config_changes() - else: - changes = self._get_random_changes() - send = False - for c in changes: - log.debug("processing change c=%s", c) - if isinstance(c, str): - assert c == "send" or c == "break" - if c == "send": - send = True - break - else: - log.info("c=%s self.monitored_paths=%s", - c, self.monitored_paths) - (path, val) = c - if path[0] != '/': - path = '/' + path - if any(path.startswith(elem) for elem in - self.monitored_paths): - log.info("appending (path, val)=%s", (path, val)) - self.change_db.append((path, val)) - if path in self.adapter.demo_db: - self.adapter.demo_db[path] = val - elif path in self.adapter.demo_state_db: - self.adapter.demo_state_db[ - path] = val - else: - assert False - if send: - if len(self.change_db): - self.change_event_queue.put( - self.ChangeEvent.SEND) + self._process_add_event() elif event == self.ChangeEvent.SEND: - # send all modified paths self.put_event(self.SubscriptionEvent.SEND_CHANGES) elif event == self.ChangeEvent.FINISH: break @@ -297,12 +341,66 @@ def process_changes(self): if add_count == 0: add_count += 1 # sometimes skip add if is first from previous - if randint(0, 1) == 0: - continue + if randint(0, 1) == 0: continue add_count = 0 # reset add count self.change_event_queue.put(self.ChangeEvent.ADD) log.debug("<==") + def _process_add_event(self): + log.debug("==>") + with self.change_db_lock, self.adapter.db_lock: + if "changes" in GnmiDemoServerAdapter.config: + changes = self._get_config_changes() + else: + changes = self._get_random_changes() + send = False + for c in changes: + log.debug("processing change c=%s", c) + if isinstance(c, str): + assert c == "send" or c == "break" + if c == "send": + send = True + break + else: + log.info("c=%s self.monitored_paths=%s", + c, self.monitored_paths) + (path, ch_op) = c + if path[0] != '/': + path = '/' + path + if any(path.startswith(elem) for elem in + self.monitored_paths): + log.info("appending (path, val)=%s", (path, ch_op)) + self.change_db.append( + self.update_demo_db(path, ch_op)) + if send: + if len(self.change_db): + self.change_event_queue.put( + self.ChangeEvent.SEND) + log.debug("<==") + + def update_demo_db(self, path, ch_op): + log.debug("==>") + ret = (path, ch_op) + if isinstance(ch_op, ChangeVal): + if path in self.adapter.demo_db: + self.adapter.demo_db[path] = ch_op + elif path in self.adapter.demo_state_db: + self.adapter.demo_state_db[ + path] = ch_op + else: + log.warning("path=%s not found, skipping update (val=%s)", path, ch_op) + elif isinstance(ch_op, ChangeDel): + for db in [self.adapter.demo_db, + self.adapter.demo_state_db]: + for db_path in list(db.keys()): + if db_path.startswith(path): + del db[db_path] + if "gnmi-tools/top-d" in db_path: + a_path = db_path.split(']')[0] + ch_op.deleted_paths.append(a_path + ']') + log.debug("<== ret=%s", ret) + return ret + def add_path_for_monitoring(self, path, prefix): log.debug("==>") @@ -373,7 +471,9 @@ def get_db_updates_for_path(self, path, prefix, db, allow_aggregation=False): path_val_list = [] map_db = self._demo_db_to_key_elem_map(db) ifaces = self._nsless_xpath(path_with_prefix) - if allow_aggregation and ifaces == "/interfaces" or ifaces == "/interfaces-state": + if allow_aggregation and (ifaces == "/interfaces" or ifaces == "/interfaces-state"): + # for now, only interface elements supported for aggregation + map_db = {key: value for key, value in map_db.items() if "if_" in key} if list(db.keys())[0].startswith(ifaces): path_val_list = [ (path_with_prefix, {"interface": list(map_db.values())})] @@ -421,11 +521,12 @@ def get_updates(self, path, prefix, data_type): with self.db_lock: if data_type == gnmi_pb2.GetRequest.DataType.CONFIG or \ data_type == gnmi_pb2.GetRequest.DataType.ALL: - updates = self.get_db_updates_for_path(path, prefix, self.demo_db, + updates = self.get_db_updates_for_path(path, prefix, + GnmiDemoServerAdapter.demo_db, allow_aggregation=True) if data_type != gnmi_pb2.GetRequest.DataType.CONFIG: updates = self.get_db_updates_for_path(path, prefix, - self.demo_state_db, + GnmiDemoServerAdapter.demo_state_db, allow_aggregation=True) log.debug("<== updates=%s", updates) @@ -448,18 +549,19 @@ def get(self, prefix, paths, data_type, use_models): @contextmanager def update_transaction(self, prefix): - yield DemoTransaction(prefix) + yield DemoTransaction(prefix, GnmiDemoServerAdapter.demo_db) class DemoTransaction(UpdateTransaction): - def __init__(self, prefix): + def __init__(self, prefix, demo_db): self.prefix = prefix + self.demo_db = demo_db def set_update(self, path, val): log.info("==> path=%s, val=%s", path, val) path_str = make_xpath_path(path, self.prefix) op = gnmi_pb2.UpdateResult.INVALID - if GnmiDemoServerAdapter._nsless_xpath(path_str) in GnmiDemoServerAdapter.demo_db: + if GnmiDemoServerAdapter._nsless_xpath(path_str) in self.demo_db: if val.string_val: str_val = val.string_val elif val.json_ietf_val: @@ -471,7 +573,7 @@ def set_update(self, path, val): str_val = "{}".format(val) str_val = str_val.replace(GnmiDemoServerAdapter.NS_IANA, "") with GnmiDemoServerAdapter.db_lock: - GnmiDemoServerAdapter.demo_db[path_str] = str_val + self.demo_db[path_str] = str_val op = gnmi_pb2.UpdateResult.UPDATE log.info("==> op=%s", op) diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index c9952e6..ff44925 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -1,3 +1,5 @@ +from abc import abstractmethod + import itertools import json import subprocess @@ -11,8 +13,8 @@ import gnmi_pb2 from confd_gnmi_client import ConfDgNMIClient from confd_gnmi_common import make_gnmi_path, datatype_str_to_int, \ - make_formatted_path, get_timestamp_ns -from confd_gnmi_demo_adapter import GnmiDemoServerAdapter + make_formatted_path, get_timestamp_ns, add_path_prefix +from confd_gnmi_demo_adapter import GnmiDemoServerAdapter, ChangeDel, ChangeVal from confd_gnmi_servicer import AdapterType, ConfDgNMIServicer from utils.utils import log, nodeid_to_path @@ -22,12 +24,18 @@ class GrpcBase: NS_IETF_INTERFACES = GnmiDemoServerAdapter.NS_INTERFACES NS_OC_INTERFACES = "openconfig-interfaces:" + NS_GNMI_TOOLS = GnmiDemoServerAdapter.NS_GNMI_TOOLS PREFIX_MAP = { NS_IETF_INTERFACES: "if:", - NS_OC_INTERFACES: "oc-if:" + NS_OC_INTERFACES: "oc-if:", + NS_GNMI_TOOLS: NS_GNMI_TOOLS } + @abstractmethod + def set_adapter_type(self): + raise NotImplementedError + @pytest.fixture def fix_method(self, request): log.debug("==> fixture method setup request={}".format(request)) @@ -37,6 +45,7 @@ def fix_method(self, request): self.set_adapter_type() self.server = ConfDgNMIServicer.serve(adapter_type=self.adapter_type, insecure=True) self.client = ConfDgNMIClient(insecure=True) + GnmiDemoServerAdapter.fill_demo_db() log.debug("<== fixture method setup") yield log.debug("==> fixture method teardown (nodeid %s)" % nodeid_path) @@ -105,7 +114,9 @@ def capability_supported(cap): assert gnmi_pb2.Encoding.JSON_IETF in capabilities.supported_encodings @staticmethod - def assert_update(update, path_val): + def assert_update(update, path_val, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): """ Asserts that the update matches the expected path and value. @@ -125,7 +136,8 @@ def assert_update(update, path_val): # Validate the path if required if check_path: - assert update.path == path_val[0] + assert (add_path_prefix(update.path, update_prefix) == + add_path_prefix(path_val[0], pv_prefix)) # Parse the json_ietf_val attribute of the update object json_value = json.loads(update.val.json_ietf_val) @@ -134,27 +146,39 @@ def assert_update(update, path_val): assert json_value == path_val[1] @staticmethod - def assert_set_response(response, path_op): - assert (response.path == path_op[0]) + def assert_set_response(response, path_op, + response_prefix=gnmi_pb2.Path(), + set_prefix=gnmi_pb2.Path()): + assert (add_path_prefix(response.path, response_prefix) == + add_path_prefix(path_op[0], set_prefix)) assert (response.op == path_op[1]) @staticmethod - def assert_updates(updates, path_vals): + def assert_updates(updates, path_vals, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): assert (len(updates) == len(path_vals)) for i, u in enumerate(updates): - AdapterTests.assert_update(u, path_vals[i]) + AdapterTests.assert_update(u, path_vals[i], update_prefix, pv_prefix) @staticmethod - def assert_one_in_update(updates, pv): - assert any(u.path == pv[0] and json.loads(u.val.json_ietf_val) == pv[1] - for u in updates) + def assert_one_in_update(updates, pv, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): + assert any( + add_path_prefix(u.path, update_prefix) == add_path_prefix(pv[0], pv_prefix) + and json.loads(u.val.json_ietf_val) == pv[1] + for u in updates + ) @staticmethod - def assert_in_updates(updates, path_vals): + def assert_in_updates(updates, path_vals, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): log.debug("==> updates=%s path_vals=%s", updates, path_vals) assert (len(updates) == len(path_vals)) for pv in path_vals: - AdapterTests.assert_one_in_update(updates, pv) + AdapterTests.assert_one_in_update(updates, pv, update_prefix, pv_prefix) log.debug("<==") @@ -174,23 +198,27 @@ def verify_get_response_updates(self, prefix, paths, path_value, if prefix: assert (n.prefix == prefix) assert(time_before <= n.timestamp and n.timestamp <= time_after) - assert_fun(n.update, path_value) - - def verify_sub_sub_response_updates(self, prefix, paths, path_value, - assert_fun=None, - subscription_mode=gnmi_pb2.SubscriptionList.ONCE, - poll_interval=0, - poll_count=0, read_count=-1, - sample_interval = None, - encoding=gnmi_pb2.Encoding.JSON_IETF, - allow_aggregation=True): + assert_fun(n.update, path_value, n.prefix, prefix) + + def subscribe_and_verify_response( + self, prefix, paths, path_value, delete_paths=[], + assert_fun=None, + subscription_mode=gnmi_pb2.SubscriptionList.ONCE, + poll_interval=0, poll_count=0, read_count=-1, + sample_interval = None, + encoding=gnmi_pb2.Encoding.JSON_IETF, + allow_aggregation=True + ): ''' Invoke subscription and verify received updates - :param prefix: gNMI prefix for subscription - :param paths: gNMI path for subscription - :param path_value: array of tuples of expected (path,value) for each response - path is gNMI path, val is response value (in json) - :param assert_fun: function to verify updates according to path_value + :param prefix: gNMI prefix for the subscription + :param paths: gNMI paths for the subscription + :param path_value: one array or array of arrays of tuples + of expected (path, value) in update field + of responses, val is response value (in json) + :param delete_paths: array or array of array of paths in delete field of responses + :param assert_fun: function to verify updates in one response according + to current path_value element :param subscription_mode: :param poll_interval: interval between polls (for gnmi_pb2.SubscriptionList.POLL only) :param poll_count: number of polls (for gnmi_pb2.SubscriptionList.POLL only) @@ -203,29 +231,32 @@ def verify_sub_sub_response_updates(self, prefix, paths, path_value, if assert_fun is None: assert_fun = AdapterTests.assert_updates - stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE + stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE if sample_interval is not None: assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE - log.debug("paths=%s path_value=%s", paths, path_value) + log.debug("paths=%s path_value=%s delete_paths=%s", + paths, path_value, delete_paths) response_count = 0 - pv_idx = 0 - for pv in path_value: - if not isinstance(pv, list): - pv_idx = -1 - break - log.debug("pv_idx=%s", pv_idx) + + def init_idx(vals): + idx = -1 if not vals or any( + not isinstance(v, list) for v in vals) else 0 + return idx + pv_idx = init_idx(path_value) + del_idx = init_idx(delete_paths) + log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx) def read_subscribe_responses(responses, read_count=-1): - nonlocal response_count, pv_idx + nonlocal response_count, pv_idx, del_idx prev_response_time_ms = 0 SAMPLE_THRESHOLD = 300 for response in responses: time_after = get_timestamp_ns() log.debug("response=%s response_count=%i time_after=%i", response, response_count, time_after) - response_time_ms = time_after/1000000 + response_time_ms = time_after / 1000000 if response.sync_response: log.debug("sync_response") assert response_count == 1 # sync expected only after first response @@ -234,7 +265,11 @@ def read_subscribe_responses(responses, read_count=-1): assert (time_before <= response.update.timestamp and response.update.timestamp <= time_after) if prefix: - assert (response.update.prefix == prefix) + prefix_str = make_formatted_path(prefix) + prefix_update_str = make_formatted_path(response.update.prefix) + assert (prefix_update_str.startswith(prefix_str) + or prefix_update_str.startswith(prefix_str)) + pv_to_check = path_value if pv_idx != -1: assert pv_idx < len(path_value) @@ -242,6 +277,18 @@ def read_subscribe_responses(responses, read_count=-1): pv_idx += 1 if len(pv_to_check) > 0: # skip empty arrays assert_fun(response.update.update, pv_to_check) + + del_to_check = delete_paths + if del_idx != -1: + assert del_idx < len(delete_paths) + del_to_check = delete_paths[del_idx] + del_idx += 1 + if len(del_to_check) > 0: + assert (len(response.update.delete) == len(del_to_check)) + for i, d in enumerate(response.update.delete): + assert (add_path_prefix(d, response.update.prefix) + == add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete + log.debug("response_count=%i pv_idx=%i", response_count, pv_idx) if sample_interval and response_count > 1: assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and ( @@ -258,25 +305,30 @@ def read_subscribe_responses(responses, read_count=-1): assert read_count == -1 or read_count == 0 read_fun = read_subscribe_responses - subscription_list = \ - ConfDgNMIClient.make_subscription_list(prefix, - paths, - subscription_mode, - encoding, - stream_mode = stream_mode, - sample_interval_ms=sample_interval, - allow_aggregation=allow_aggregation) + subscription_list = ConfDgNMIClient.make_subscription_list( + prefix, + paths, + subscription_mode, + encoding, + stream_mode = stream_mode, + sample_interval_ms=sample_interval, + allow_aggregation=allow_aggregation + ) time_before = get_timestamp_ns() - responses = self.client.subscribe(subscription_list, - read_fun=read_fun, - poll_interval=poll_interval, - poll_count=poll_count, - read_count=read_count) + responses = self.client.subscribe( + subscription_list, + read_fun=read_fun, + poll_interval=poll_interval, + poll_count=poll_count, + read_count=read_count + ) log.debug("responses=%s", responses) if poll_count: - assert (poll_count + 1 == response_count) + assert poll_count + 1 == response_count + + def _test_get_subscribe(self, is_subscribe=False, datatype=gnmi_pb2.GetRequest.DataType.CONFIG, @@ -289,12 +341,11 @@ def _test_get_subscribe(self, is_subscribe=False, kwargs = {"assert_fun": AdapterTests.assert_updates} if_state_str = prefix_state_str = "" - db = GnmiDemoServerAdapter.get_adapter().demo_db + db = GnmiDemoServerAdapter.demo_db if datatype == gnmi_pb2.GetRequest.DataType.STATE: prefix_state_str = "-state" if_state_str = "state_" db = GnmiDemoServerAdapter.demo_state_db - map_db = GnmiDemoServerAdapter._demo_db_to_key_elem_map(db) prefix = make_gnmi_path("/ietf-interfaces:interfaces{}".format(prefix_state_str)) kwargs["prefix"] = prefix if_id = 8 @@ -311,7 +362,7 @@ def _test_get_subscribe(self, is_subscribe=False, ifname = "{}if_{}".format(if_state_str, if_id) if is_subscribe: - verify_response_updates = self.verify_sub_sub_response_updates + verify_response_updates = self.subscribe_and_verify_response kwargs["subscription_mode"] = subscription_mode kwargs["poll_interval"] = poll_interval kwargs["poll_count"] = poll_count @@ -371,12 +422,15 @@ def _test_get_subscribe(self, is_subscribe=False, kwargs["assert_fun"] = AdapterTests.assert_in_updates verify_response_updates(**kwargs) if allow_aggregation: + map_db = GnmiDemoServerAdapter._demo_db_to_key_elem_map(db) + # remove non interface related entries + map_db = {key: value for key, value in map_db.items() if "if_" in key} kwargs["paths"] = [list_paths[2]] if allow_aggregation: kwargs["path_value"] = [(list_paths[2], {"interface": list(map_db.values())})] kwargs["assert_fun"] = None - kwargs["prefix"] = None + kwargs["prefix"] = gnmi_pb2.Path() verify_response_updates(**kwargs) @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"]) @@ -454,9 +508,16 @@ def confd_cmd_subprocess(confd_cmd): def format_command(c): path = make_gnmi_path(c[0]) - return "mset {} {}".format( - make_formatted_path(path, gnmi_prefix=path_prefix), - c[1].split(":")[-1]) # remove json prefix + if isinstance(c[1], (str, ChangeVal)): + cmd = "mset {} {}".format( + make_formatted_path(path, gnmi_prefix=path_prefix), + c[1].split(":")[-1]) # remove json prefix + elif isinstance(c[1], ChangeDel): + cmd = "mdel {}".format( + make_formatted_path(path, gnmi_prefix=path_prefix)) + else: + raise TypeError(f"Invalid value type: {type(c[1])}") + return cmd for send, changes in itertools.groupby(changes_list, lambda c: c == "send"): if not send: @@ -464,24 +525,33 @@ def format_command(c): sleep(1) @staticmethod - def _changes_list_to_pv(changes_list): + def _changes_list_to_pv_del(changes_list): ''' - Return path_value_list created from changes_list. + Return pv_res and delete_res lists created from changes_list. :param changes_list: - :return: + :return: (pv_res, delete_res) ''' - path_value = [] - pv_idx = 0 + pv_res, del_res = [], [] + pv_candidates, del_candidates = [], [] for c in changes_list: - if isinstance(c, str): + if type(c) is str: if c == "send": - pv_idx += 1 + pv_res.append(pv_candidates) + del_res.append(del_candidates) + pv_candidates, del_candidates = [], [] else: - if len(path_value) < pv_idx + 1: - path_value.append([]) - path_value[pv_idx].append((make_gnmi_path(c[0]), c[1])) - log.debug("path_value=%s", path_value) - return path_value + if isinstance(c[1], (str, int)) or isinstance(c[1], ChangeVal): + pv_candidates.append((make_gnmi_path(c[0]), c[1])) + elif isinstance(c[1], ChangeDel): + if c[1].deleted_paths: + for p in c[1].deleted_paths: + del_candidates.append(make_gnmi_path(p)) + else: + del_candidates.append(make_gnmi_path(c[0])) + else: + raise TypeError(f"Invalid value type: {type(c[1])}") + log.debug("pv_res=%s delete_res=%s", pv_res, del_res) + return pv_res, del_res @staticmethod def _changes_list_to_xml(changes_list, prefix_str): @@ -496,7 +566,12 @@ def _changes_list_to_xml(changes_list, prefix_str): else: ET.SubElement(el, "path").text = "{}/{}".format(prefix_str, c[0]) - ET.SubElement(el, "val").text = c[1] + if isinstance(c[1], str) or isinstance(c[1], ChangeVal): + ET.SubElement(el, "val").text = c[1] + elif isinstance(c[1], ChangeDel): + ET.SubElement(el, "del") + else: + raise TypeError(f"Invalid value type: {type(c[1])}") xml_str = ET.tostring(demo, encoding='unicode') log.debug("xml_str=%s", xml_str) return xml_str @@ -505,7 +580,7 @@ def _changes_list_to_xml(changes_list, prefix_str): @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"]) def test_subscribe_stream(self, request, data_type): log.info("testing subscribe_stream") - if_state_str = prefix_state_str = "" + if_state_str, prefix_state_str = "", "" if data_type == "STATE": prefix_state_str = "-state" if_state_str = "state_" @@ -530,15 +605,38 @@ def test_subscribe_stream(self, request, data_type): self._test_subscribe(prefix_str, self.NS_IETF_INTERFACES, paths, changes_list) + @pytest.mark.usefixtures("reset_cfg") + def test_subscribe_stream_delete(self, request): + log.info("testing subscribe_stream_delete") + + changes_list = [ + ("top-d", ChangeDel(deleted_paths=[ + "/top-d/top-d-list[name=n1]", + "/top-d/top-d-list[name=n2]", + "/top-d/top-d-list[name=n3]", + "/top-d/top-d-list[name=n4]", + ])), + "send", + ] + log.info("change_list=%s", changes_list) + + prefix_str = "{prefix}gnmi-tools" + paths = [make_gnmi_path("top-d")] + self._test_subscribe(prefix_str, "gnmi-tools:", + paths, changes_list) + def _test_subscribe(self, prefix_str, ns_prefix, paths, changes_list): - path_value = [[]] # empty element means no check - path_value.extend(self._changes_list_to_pv(changes_list)) + path_value, delete = [[]], [[]] # empty element means no check - skip first response + pv, de = self._changes_list_to_pv_del(changes_list) + path_value.extend(pv) + delete.extend(de) prefix = make_gnmi_path("/" + prefix_str.format(prefix=ns_prefix)) kwargs = {"assert_fun": AdapterTests.assert_in_updates} kwargs["prefix"] = prefix kwargs["paths"] = paths kwargs["path_value"] = path_value + kwargs["delete_paths"] = delete kwargs["subscription_mode"] = gnmi_pb2.SubscriptionList.STREAM kwargs["read_count"] = len(path_value) kwargs["assert_fun"] = AdapterTests.assert_in_updates @@ -548,13 +646,16 @@ def _test_subscribe(self, prefix_str, ns_prefix, paths, changes_list): GnmiDemoServerAdapter.load_config_string( self._changes_list_to_xml(changes_list, prefix_pfx)) if self.adapter_type == AdapterType.API: - prefix_pfx = prefix_str.format(prefix=self.PREFIX_MAP[ns_prefix]) + if ns_prefix in self.PREFIX_MAP: + prefix_pfx = prefix_str.format(prefix=self.PREFIX_MAP[ns_prefix]) + else: + prefix_pfx = prefix_str.format(prefix='') thr = threading.Thread( target=self._send_change_list_to_confd_thread, args=(prefix_pfx, changes_list,)) thr.start() - self.verify_sub_sub_response_updates(**kwargs) + self.subscribe_and_verify_response(**kwargs) if self.adapter_type == AdapterType.API: thr.join() @@ -581,7 +682,8 @@ def test_set(self, request): assert (time_before <= response.timestamp and response.timestamp <= time_after) assert (response.prefix == prefix) AdapterTests.assert_set_response(response.response[0], - (paths[0], gnmi_pb2.UpdateResult.UPDATE)) + (paths[0], gnmi_pb2.UpdateResult.UPDATE), + response.prefix, prefix) # fetch with get and see value has changed datatype = gnmi_pb2.GetRequest.DataType.CONFIG @@ -590,13 +692,15 @@ def test_set(self, request): for n in get_response.notification: log.debug("n=%s", n) assert (n.prefix == prefix) - AdapterTests.assert_updates(n.update, [(paths[0], "iana-if-type:fastEther")]) + AdapterTests.assert_updates(n.update, [(paths[0], "iana-if-type:fastEther")], + n.prefix, prefix) # put value back vals = [gnmi_pb2.TypedValue(json_ietf_val=b"\"iana-if-type:gigabitEthernet\"")] response = self.client.set(prefix, list(zip(paths, vals))) AdapterTests.assert_set_response(response.response[0], - (paths[0], gnmi_pb2.UpdateResult.UPDATE)) + (paths[0], gnmi_pb2.UpdateResult.UPDATE), + response.prefix, prefix) @pytest.mark.usefixtures("reset_cfg") def test_set_encoding(self, request): @@ -630,4 +734,5 @@ def test_it(encoding): json_ietf_val=b"\"iana-if-type:gigabitEthernet\"")] response = self.client.set(prefix, list(zip(paths, vals))) AdapterTests.assert_set_response(response.response[0], - (paths[0], gnmi_pb2.UpdateResult.UPDATE)) + (paths[0], gnmi_pb2.UpdateResult.UPDATE), + response.prefix, prefix) diff --git a/tests/test_client_server_confd.py b/tests/test_client_server_confd.py index 333942c..1736331 100644 --- a/tests/test_client_server_confd.py +++ b/tests/test_client_server_confd.py @@ -103,7 +103,7 @@ def _test_gnmi_tools_get_subscribe_gnmi_tools(self, is_subscribe=False, self.leaf_paths_str_for_gnmi_tools] if is_subscribe: - verify_response_updates = self.verify_sub_sub_response_updates + verify_response_updates = self.subscribe_and_verify_response kwargs["subscription_mode"] = subscription_mode kwargs["poll_interval"] = poll_interval kwargs["poll_count"] = poll_count @@ -182,7 +182,7 @@ def test_subscribe_stream_on_change_api_state(self, request): "send", ] path_value = [[]] # empty element means no check - path_value.extend(self._changes_list_to_pv(changes_list)) + path_value.extend(self._changes_list_to_pv_del(changes_list)[0]) prefix_str = "" prefix = make_gnmi_path(prefix_str) @@ -208,7 +208,7 @@ def test_subscribe_stream_on_change_api_state(self, request): change_thread.start() try: - self.verify_sub_sub_response_updates(**kwargs) + self.subscribe_and_verify_response(**kwargs) sleep(1) finally: diff --git a/tests/test_client_server_demo.py b/tests/test_client_server_demo.py index f6fa0f5..f98a130 100644 --- a/tests/test_client_server_demo.py +++ b/tests/test_client_server_demo.py @@ -2,6 +2,7 @@ from client_server_test_base import AdapterTests from confd_gnmi_server import AdapterType +from confd_gnmi_demo_adapter import GnmiDemoServerAdapter _confd_DEBUG = 1 @@ -16,3 +17,4 @@ def set_adapter_type(self): def _do_reset_cfg(self): yield + GnmiDemoServerAdapter.reset() From f5853c5dd7437d33a2965d01bbe901e0008f848b Mon Sep 17 00:00:00 2001 From: Michal Novak Date: Tue, 9 Jan 2024 16:25:36 +0100 Subject: [PATCH 2/5] review comments 1 Signed-off-by: Michal Novak --- src/confd_gnmi_adapter.py | 2 +- src/confd_gnmi_api_adapter.py | 2 +- src/confd_gnmi_demo_adapter.py | 5 +++-- tests/client_server_test_base.py | 7 ++----- 4 files changed, 7 insertions(+), 9 deletions(-) diff --git a/src/confd_gnmi_adapter.py b/src/confd_gnmi_adapter.py index aa657cb..e5522d0 100644 --- a/src/confd_gnmi_adapter.py +++ b/src/confd_gnmi_adapter.py @@ -125,7 +125,7 @@ def add_path_for_monitoring(self, path, prefix): pass @abstractmethod - def get_subscription_notifications(self): + def get_subscription_notifications(self) -> list[gnmi_pb2.Notification]: """ Get gNMI subscription updates for changed values :return: gNMI Notification array diff --git a/src/confd_gnmi_api_adapter.py b/src/confd_gnmi_api_adapter.py index 2526aec..f41fdce 100644 --- a/src/confd_gnmi_api_adapter.py +++ b/src/confd_gnmi_api_adapter.py @@ -114,7 +114,7 @@ def __init__(self, adapter, subscription_list): self.stop_pipe = None self.subpoint_paths = {} - def get_subscription_notifications(self): + def get_subscription_notifications(self) -> list[gnmi_pb2.Notification]: return [gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=prefix, update=updates, diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index 0b1f7b4..cabde0d 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -228,7 +228,7 @@ def get_sample(self, path, prefix, allow_aggregation=False) -> []: log.debug("<== updates=%s", updates) return updates - def get_subscription_notifications(self) -> []: + def get_subscription_notifications(self) -> list[gnmi_pb2.Notification]: log.debug("==> self.change_db=%s", self.change_db) assert len(self.change_db) > 0 update = [] @@ -341,7 +341,8 @@ def process_changes(self): if add_count == 0: add_count += 1 # sometimes skip add if is first from previous - if randint(0, 1) == 0: continue + if randint(0, 1) == 0: + continue add_count = 0 # reset add count self.change_event_queue.put(self.ChangeEvent.ADD) log.debug("<==") diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index ff44925..6ae36da 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -34,7 +34,7 @@ class GrpcBase: @abstractmethod def set_adapter_type(self): - raise NotImplementedError + pass @pytest.fixture def fix_method(self, request): @@ -646,10 +646,7 @@ def _test_subscribe(self, prefix_str, ns_prefix, paths, changes_list): GnmiDemoServerAdapter.load_config_string( self._changes_list_to_xml(changes_list, prefix_pfx)) if self.adapter_type == AdapterType.API: - if ns_prefix in self.PREFIX_MAP: - prefix_pfx = prefix_str.format(prefix=self.PREFIX_MAP[ns_prefix]) - else: - prefix_pfx = prefix_str.format(prefix='') + prefix_pfx = prefix_str.format(prefix=self.PREFIX_MAP.get(ns_prefix, '')) thr = threading.Thread( target=self._send_change_list_to_confd_thread, args=(prefix_pfx, changes_list,)) From 254f6e9f7a44525f5a3b5074b66e787dfb4d0ded Mon Sep 17 00:00:00 2001 From: Michal Novak Date: Wed, 10 Jan 2024 11:52:11 +0100 Subject: [PATCH 3/5] review comments 2 Signed-off-by: Michal Novak --- src/confd_gnmi_demo_adapter.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index cabde0d..26be7fe 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -145,16 +145,13 @@ def fill_demo_db(cls, reset = False): cls.demo_state_db[f"{state_path}/type"] = "gigabitEthernet" cls.demo_db[f"{path}/enabled"] = True + path_str = "/gnmi-tools/top-d/top-d-list[name=n{index}]" for i in range(1, 5): - path = f"/gnmi-tools/top-d/top-d-list[name=n{i}]" - cls.demo_db[f"{path}/name"] = f"n{i}" - if i == 2: - cls.demo_db[f"{path}/empty-leaf"] = [None] - if i == 3: - cls.demo_db[f"{path}/pres"] = {} - if i == 4: - cls.demo_db[f"{path}/down/str-leaf"] = "test" - cls.demo_db[f"{path}/down/int-leaf"] = "23" + cls.demo_db[f"{path_str.format(index=i)}/name"] = f"n{i}" + cls.demo_db[f"{path_str.format(index=2)}/empty-leaf"] = [None] + cls.demo_db[f"{path_str.format(index=3)}/pres"] = {} + cls.demo_db[f"{path_str.format(index=4)}/down/str-leaf"] = "test" + cls.demo_db[f"{path_str.format(index=4)}/down/int-leaf"] = "23" log.debug("<== self.demo_db=%s self.demo_state_db=%s", cls.demo_db, cls.demo_state_db) From 354fde9eeb0ee139a84e76e66c8fd4d41cf5e55e Mon Sep 17 00:00:00 2001 From: Michal Novak Date: Wed, 10 Jan 2024 15:42:45 +0100 Subject: [PATCH 4/5] review comments 3 Signed-off-by: Michal Novak --- src/confd_gnmi_demo_adapter.py | 7 +- tests/client_server_test_base.py | 177 +++++++++++++++++-------------- 2 files changed, 104 insertions(+), 80 deletions(-) diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index 26be7fe..345ee45 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -16,6 +16,7 @@ log = logging.getLogger('confd_gnmi_demo_adapter') + # Types for streaming telemetry operations class ChangeOp: @abstractmethod @@ -45,10 +46,10 @@ class ChangeDel(ChangeOp): def __init__(self, deleted_paths=[]): self.deleted_paths = deleted_paths def __repr__(self): - return f'ChangeDel()' + return 'ChangeDel' def __str__(self): - return f'ChangeDel' + return 'ChangeDel' class GnmiDemoServerAdapter(GnmiServerAdapter): NS_INTERFACES = "ietf-interfaces:" @@ -395,7 +396,7 @@ def update_demo_db(self, path, ch_op): del db[db_path] if "gnmi-tools/top-d" in db_path: a_path = db_path.split(']')[0] - ch_op.deleted_paths.append(a_path + ']') + ch_op.deleted_paths.append(a_path + ']') log.debug("<== ret=%s", ret) return ret diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index 6ae36da..96f56d3 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -1,3 +1,4 @@ +import functools from abc import abstractmethod import itertools @@ -181,7 +182,6 @@ def assert_in_updates(updates, path_vals, AdapterTests.assert_one_in_update(updates, pv, update_prefix, pv_prefix) log.debug("<==") - def verify_get_response_updates(self, prefix, paths, path_value, datatype, encoding, assert_fun=None): if assert_fun is None: @@ -200,6 +200,94 @@ def verify_get_response_updates(self, prefix, paths, path_value, assert(time_before <= n.timestamp and n.timestamp <= time_after) assert_fun(n.update, path_value, n.prefix, prefix) + def read_subscribe_responses(self, responses, read_count, prefix, poll_count, + assert_fun, sample_interval, path_value, + delete_paths, time_before): + """ + Iterates the subscribe responses and performs various assertions and checks + Params: + :param responses: The subscribe responses. + :param read_count (int): The number of responses to read. + :param prefix: The prefix for the response update. + :param poll_count (int): The number of poll counts. + :param assert_fun (function): The assertion function to check the response update. + :param sample_interval (int): The interval between samples. + :param path_value (list): The list of path values to check. + :param delete_paths (list): The list of delete paths to check. + :param time_before (int): The timestamp before the response update. + + Returns: + None + + Raises: + AssertionError: If any assertion fails. + """ + + def init_idx(vals): + idx = -1 if not vals or any( + not isinstance(v, list) for v in vals) else 0 + return idx + + response_count = 0 + pv_idx = init_idx(path_value) + del_idx = init_idx(delete_paths) + log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx) + prev_response_time_ms = 0 + SAMPLE_THRESHOLD = 1000 + for response in responses: + time_after = get_timestamp_ns() + log.debug("response=%s response_count=%i time_after=%i", response, + response_count, time_after) + response_time_ms = time_after / 1000000 + if response.sync_response: + log.debug("sync_response") + assert response_count == 1 # sync expected only after first response + else: + response_count += 1 + assert (time_before <= response.update.timestamp and + response.update.timestamp <= time_after) + if prefix: + prefix_str = make_formatted_path(prefix) + prefix_update_str = make_formatted_path(response.update.prefix) + assert (prefix_update_str.startswith(prefix_str) + or prefix_update_str.startswith(prefix_str)) + + pv_to_check = path_value + if pv_idx != -1: + assert pv_idx < len(path_value) + pv_to_check = path_value[pv_idx] + pv_idx += 1 + if len(pv_to_check) > 0: # skip empty arrays + assert_fun(response.update.update, pv_to_check) + + del_to_check = delete_paths + if del_idx != -1: + assert del_idx < len(delete_paths) + del_to_check = delete_paths[del_idx] + del_idx += 1 + if len(del_to_check) > 0: + assert (len(response.update.delete) == len(del_to_check)) + for i, d in enumerate(response.update.delete): + assert (add_path_prefix(d, response.update.prefix) + == add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete + + log.debug("response_count=%i pv_idx=%i", response_count, pv_idx) + if sample_interval and response_count > 1: + assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and ( + response_time_ms < (prev_response_time_ms + sample_interval + SAMPLE_THRESHOLD)) + + if read_count > 0: + read_count -= 1 + if read_count == 0: + log.info("read count reached") + break + prev_response_time_ms = response_time_ms + log.debug("Getting next response. read_count=%s response_count=%s", + read_count, response_count) + assert read_count == -1 or read_count == 0 + if poll_count: + assert poll_count + 1 == response_count + def subscribe_and_verify_response( self, prefix, paths, path_value, delete_paths=[], assert_fun=None, @@ -209,7 +297,7 @@ def subscribe_and_verify_response( encoding=gnmi_pb2.Encoding.JSON_IETF, allow_aggregation=True ): - ''' + """ Invoke subscription and verify received updates :param prefix: gNMI prefix for the subscription :param paths: gNMI paths for the subscription @@ -227,84 +315,18 @@ def subscribe_and_verify_response( :param sample_interval: interval for sample in ms (for gnmi_pb2.SubscriptionList.STREAM only) :param encoding: encoding to use (implemented only JSON_IETF) :param allow_aggregation: allow aggregation of results in updates - ''' + """ if assert_fun is None: assert_fun = AdapterTests.assert_updates stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE if sample_interval is not None: - assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM - stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE + assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM + stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE log.debug("paths=%s path_value=%s delete_paths=%s", paths, path_value, delete_paths) - response_count = 0 - def init_idx(vals): - idx = -1 if not vals or any( - not isinstance(v, list) for v in vals) else 0 - return idx - pv_idx = init_idx(path_value) - del_idx = init_idx(delete_paths) - log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx) - - def read_subscribe_responses(responses, read_count=-1): - nonlocal response_count, pv_idx, del_idx - prev_response_time_ms = 0 - SAMPLE_THRESHOLD = 300 - for response in responses: - time_after = get_timestamp_ns() - log.debug("response=%s response_count=%i time_after=%i", response, - response_count, time_after) - response_time_ms = time_after / 1000000 - if response.sync_response: - log.debug("sync_response") - assert response_count == 1 # sync expected only after first response - else: - response_count += 1 - assert (time_before <= response.update.timestamp and - response.update.timestamp <= time_after) - if prefix: - prefix_str = make_formatted_path(prefix) - prefix_update_str = make_formatted_path(response.update.prefix) - assert (prefix_update_str.startswith(prefix_str) - or prefix_update_str.startswith(prefix_str)) - - pv_to_check = path_value - if pv_idx != -1: - assert pv_idx < len(path_value) - pv_to_check = path_value[pv_idx] - pv_idx += 1 - if len(pv_to_check) > 0: # skip empty arrays - assert_fun(response.update.update, pv_to_check) - - del_to_check = delete_paths - if del_idx != -1: - assert del_idx < len(delete_paths) - del_to_check = delete_paths[del_idx] - del_idx += 1 - if len(del_to_check) > 0: - assert (len(response.update.delete) == len(del_to_check)) - for i, d in enumerate(response.update.delete): - assert (add_path_prefix(d, response.update.prefix) - == add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete - - log.debug("response_count=%i pv_idx=%i", response_count, pv_idx) - if sample_interval and response_count > 1: - assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and ( - response_time_ms < (prev_response_time_ms + sample_interval + SAMPLE_THRESHOLD)) - - if read_count > 0: - read_count -= 1 - if read_count == 0: - log.info("read count reached") - break - prev_response_time_ms = response_time_ms - log.debug("Getting next response. read_count=%s response_count=%s", - read_count, response_count) - assert read_count == -1 or read_count == 0 - - read_fun = read_subscribe_responses subscription_list = ConfDgNMIClient.make_subscription_list( prefix, paths, @@ -315,20 +337,21 @@ def read_subscribe_responses(responses, read_count=-1): allow_aggregation=allow_aggregation ) - time_before = get_timestamp_ns() responses = self.client.subscribe( subscription_list, - read_fun=read_fun, + read_fun=functools.partial(self.read_subscribe_responses, + prefix=prefix, poll_count=poll_count, + assert_fun=assert_fun, + sample_interval=sample_interval, + path_value=path_value, + delete_paths=delete_paths, + time_before = get_timestamp_ns()), poll_interval=poll_interval, poll_count=poll_count, read_count=read_count ) log.debug("responses=%s", responses) - if poll_count: - assert poll_count + 1 == response_count - - def _test_get_subscribe(self, is_subscribe=False, datatype=gnmi_pb2.GetRequest.DataType.CONFIG, From 3ef4908bf67dd11c7f3b4f211e9effc252c18ec3 Mon Sep 17 00:00:00 2001 From: Michal Novak Date: Fri, 12 Jan 2024 13:00:08 +0100 Subject: [PATCH 5/5] review comments 4 + reuse in gnmi-tools.xml Signed-off-by: Michal Novak --- data/demo.xml | 2 +- gnmi-tools.xml | 20 ++++++++++---------- gnmi-tools.yang | 23 ++++++++++++----------- src/confd_gnmi_demo_adapter.py | 4 ++-- tests/client_server_test_base.py | 12 ++++++------ 5 files changed, 31 insertions(+), 30 deletions(-) diff --git a/data/demo.xml b/data/demo.xml index 9e356df..8b81319 100644 --- a/data/demo.xml +++ b/data/demo.xml @@ -4,7 +4,7 @@ - /gnmi-tools/top-d + /gnmi-tools/top-for-delete diff --git a/gnmi-tools.xml b/gnmi-tools.xml index ce37a14..d5af84d 100644 --- a/gnmi-tools.xml +++ b/gnmi-tools.xml @@ -54,25 +54,25 @@ opticalTransport In-Service - - + + n1 - - + + n2 - - + + n3 - - + + n4 test 23 - - + + diff --git a/gnmi-tools.yang b/gnmi-tools.yang index 2c8eeb2..0d6d372 100644 --- a/gnmi-tools.yang +++ b/gnmi-tools.yang @@ -1,4 +1,5 @@ module gnmi-tools { + namespace "http://cisco.com/ns/yang/gnmi-tools"; prefix gnmi-tools; @@ -17,6 +18,14 @@ module gnmi-tools { } } + grouping top-list { + list top-list { + key name; + leaf name {type string;} + uses combo; + } + } + container gnmi-tools { container top { uses combo; @@ -27,11 +36,7 @@ module gnmi-tools { uses combo; } - list top-list { - key name; - leaf name {type string;} - uses combo; - } + uses top-list; list double-key-list { key "name type"; @@ -40,12 +45,8 @@ module gnmi-tools { leaf admin-state {type string;} } - container top-d { - list top-d-list { - key name; - leaf name {type string;} - uses combo; - } + container top-for-delete { + uses top-list; } } diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index 345ee45..b04ec18 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -146,7 +146,7 @@ def fill_demo_db(cls, reset = False): cls.demo_state_db[f"{state_path}/type"] = "gigabitEthernet" cls.demo_db[f"{path}/enabled"] = True - path_str = "/gnmi-tools/top-d/top-d-list[name=n{index}]" + path_str = "/gnmi-tools/top-for-delete/top-list[name=n{index}]" for i in range(1, 5): cls.demo_db[f"{path_str.format(index=i)}/name"] = f"n{i}" cls.demo_db[f"{path_str.format(index=2)}/empty-leaf"] = [None] @@ -394,7 +394,7 @@ def update_demo_db(self, path, ch_op): for db_path in list(db.keys()): if db_path.startswith(path): del db[db_path] - if "gnmi-tools/top-d" in db_path: + if "gnmi-tools/top-for-delete" in db_path: a_path = db_path.split(']')[0] ch_op.deleted_paths.append(a_path + ']') log.debug("<== ret=%s", ret) diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index 96f56d3..7dbd70c 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -633,18 +633,18 @@ def test_subscribe_stream_delete(self, request): log.info("testing subscribe_stream_delete") changes_list = [ - ("top-d", ChangeDel(deleted_paths=[ - "/top-d/top-d-list[name=n1]", - "/top-d/top-d-list[name=n2]", - "/top-d/top-d-list[name=n3]", - "/top-d/top-d-list[name=n4]", + ("top-for-delete", ChangeDel(deleted_paths=[ + "/top-for-delete/top-list[name=n1]", + "/top-for-delete/top-list[name=n2]", + "/top-for-delete/top-list[name=n3]", + "/top-for-delete/top-list[name=n4]", ])), "send", ] log.info("change_list=%s", changes_list) prefix_str = "{prefix}gnmi-tools" - paths = [make_gnmi_path("top-d")] + paths = [make_gnmi_path("top-for-delete")] self._test_subscribe(prefix_str, "gnmi-tools:", paths, changes_list)