From aac7cb3561da2dd0b595d3516a1dada21a49fe4a Mon Sep 17 00:00:00 2001 From: Michal Novak Date: Tue, 12 Dec 2023 14:31:25 +0100 Subject: [PATCH] docs update minor refactoring stream telemetry delete in demo adapter tests for delete in stream telemetry Signed-off-by: Michal Novak --- data/demo.xml | 10 +- docs/ConfD_gNMI_adapter.adoc | 53 +++--- gnmi-tools.xml | 20 +++ gnmi-tools.yang | 8 + src/confd_gnmi_adapter.py | 14 +- src/confd_gnmi_api_adapter.py | 3 - src/confd_gnmi_client.py | 34 ++-- src/confd_gnmi_demo_adapter.py | 284 ++++++++++++++++++++---------- tests/client_server_test_base.py | 271 +++++++++++++++++++--------- tests/test_client_server_confd.py | 6 +- tests/test_client_server_demo.py | 2 + 11 files changed, 476 insertions(+), 229 deletions(-) diff --git a/data/demo.xml b/data/demo.xml index 93fd39c..9e356df 100644 --- a/data/demo.xml +++ b/data/demo.xml @@ -3,6 +3,10 @@ + + /gnmi-tools/top-d + + /interfaces/interface[name=if_5]/type fastEther @@ -37,8 +41,12 @@ /interfaces-state/interface[name=state_if_6]/type gigabitEthernet + + /interfaces/interface[name=if_6]/type + + send - \ No newline at end of file + diff --git a/docs/ConfD_gNMI_adapter.adoc b/docs/ConfD_gNMI_adapter.adoc index a8a2325..ba9ce61 100644 --- a/docs/ConfD_gNMI_adapter.adoc +++ b/docs/ConfD_gNMI_adapter.adoc @@ -23,8 +23,8 @@ endif::[] :Author: Michal Novák :email: micnovak@cisco.com :URL: https://www.tail-f.com/ -:Date: 2023-01-24 -:Revision: 0.3.0 +:Date: 2023-12-12 +:Revision: 0.4.0 == Version history @@ -32,8 +32,9 @@ endif::[] |====== | Document version | Notes | Date | Author | 0.0.1 | Initial document version. | 2021-02-09 | {author} {email} -| 0.1.0 | Run options updated, added seq. diagrams for Subscribe operation. | 2021-03-23 | {author} {email} +| 0.1.0 | Run options updated, added sequence diagrams for `Subscribe` operation. | 2021-03-23 | {author} {email} | 0.2.0 | External data provider description, fixes, documentation update. | 2021-04-27 | {author} {email} +| 0.3.0 | Update command line options, added Encoding description | 2023-01-24 | {author} {email} | {revision} | Update command line options, added Encoding description | {date} | {author} {email} |====== @@ -41,7 +42,7 @@ toc::[] == Introduction -https://www.tail-f.com/management-agent/[ConfD] is configuration management agent supporting various standard and proprietary northbound interfaces like: +https://www.tail-f.com/management-agent/[ConfD] is a configuration management agent supporting various standard and proprietary northbound interfaces like: * https://tools.ietf.org/html/rfc6241[NETCONF] * https://tools.ietf.org/html/rfc8040[RESTCONF] @@ -51,17 +52,17 @@ https://www.tail-f.com/management-agent/[ConfD] is configuration management agen * CLI https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification.md[gNMI] is another popular north bound interface, which is not implemented by ConfD. -In this demo project we will implement gNMI Adapter over existing ConfD interfaces to make (at least partial) gNMI support. +In this project, we will implement gNMI Adapter over existing ConfD interfaces to make (at least partial) gNMI support. In the beginning, we will provide basic functionality, for most common operations, later on we will add more. -This demo focuses on functionality and simplicity, we have chosen Python as implementation language. +The project focuses on functionality and simplicity, we have chosen Python as implementation language. -We use general approach, so the demo can be adapted for other management agents and tools as well. +We use the general approach, so the project can be adapted for other management agents and tools as well. -This demo is still work in progress, see <> section. +This project is still work in progress, see <> section. === Copy/Paste and Output blocks -In this note you can find script and code examples, that can be directly pasted into the shell or CLI terminal. We will use following block style for the copy/paste ready text: +In this note, you can find script and code examples that can be directly pasted into the shell or CLI terminal. We will use the following block style for the copy/paste ready text: [source,shell,role="acopy"] ---- @@ -69,11 +70,11 @@ pip install grpcio-tools ---- NOTE: make sure all commands have executed - confirm last command with kbd:[ENTER], if needed. -If viewed on https://github.com[GitHub], you may find following +If viewed on https://github.com[GitHub], you may find the following browser https://github.com/zenorocha/codecopy[extension] useful (out-of-the-box *copy to clipboard* button). The output of the shell CLI commands or file content will be displayed -with following block style: +with the following block style: .[.small]_output_ [.output] @@ -104,7 +105,7 @@ Options: We expect https://www.python.org/[Python3] to be installed. The `python` and `pip` commands are from Python3 environment. If not, use `python3` or `pip3` instead (or use e.g. `sudo apt-get install python-is-python3`) -TIP: For package installation and development, you may consider creating https://docs.python.org/3/tutorial/venv.html[python virtual environment]. +TIP: For package installation and development, you may consider creating a https://docs.python.org/3/tutorial/venv.html[python virtual environment]. === gRPC Python tools @@ -126,7 +127,7 @@ pip install --upgrade grpcio-tools === Pytest -For automated tests we will use https://www.pytest.org/[pytest] framework. +For automated tests, we will use a https://www.pytest.org/[pytest] framework. If you want to run tests, use `pip` to install it. .Installation @@ -141,7 +142,7 @@ pip install pytest pip install --upgrade pytest ---- -NOTE: `pytest` may be available also as package in your distribution (e.g. `apt-get install python3-pytest`). We still recommend to use `pip` to get the latest version. +NOTE: `pytest` is often available also as package in your distribution (e.g. `apt-get install python3-pytest`). We still recommend to use `pip` to get the latest version. === ConfD @@ -153,7 +154,7 @@ Install https://www.tail-f.com/management-agent/[ConfD Premium] or https://www.t source ${CONFD_DIR}/confdrc ---- -TIP: See https://info.tail-f.com/confd-evaluation-kick-start-guide[ConfD Kick Start Guide] for additional information. +TIP: See https://info.tail-f.com/confd-evaluation-kick-start-guide[ConfD Kick-Start Guide] for additional information. === Build environment @@ -182,7 +183,7 @@ service gNMI { NOTE: The interface itself looks relatively simple, but the `Request` and `Response` messages may be complex. `Subscribe` method has many variants. More details can be found in the https://github.com/openconfig/reference/blob/master/rpc/gnmi/gnmi-specification[gNMI Specification]. -== Building gNMI Adapter demo +== Building the gNMI Adapter === gNMI python binding @@ -206,7 +207,7 @@ https://tools.ietf.org/html/rfc8343[`ietf-interfaces.yang`] and its dependencies NOTE: The used datamodel and initial configuration is used for demonstration in this note. The gNMI Adapter can run against any other ConfD instance with different data model. In this case, paths and values will be different. See examples with ConfD example application <> and <>. -== Running gNMI Adapter demo +== Running the gNMI Adapter Before running the adapter, we need to make sure gNMI python binding is created. @@ -216,11 +217,11 @@ Before running the adapter, we need to make sure gNMI python binding is created. make clean all ---- -The adapter can be run in _demo_ and _api_ mode. +The adapter can be run in _demo_, _confd_ and _nso_ mode. -In the _demo_ mode it does not require running ConfD, it partly emulates `ietf-interfaces.yang` data model and initial configuration. This mode is useful for testing, development, etc. +In the _demo_ mode it does not require running ConfD or NSO, it partly emulates `ietf-interfaces.yang` data model and initial configuration. This mode is useful for testing, development, etc. -In case we want to run adapter against ConfD (_api_ mode), we can use `Makefile` `start` target to start ConfD with initial demo configuration. +In case we want to test adapter against ConfD (_confd_ mode), we can use `Makefile` `start` target to start ConfD with initial configuration. .rebuild everything and start ConfD and load demo configuration [source, shell, role="acopy"] @@ -450,6 +451,14 @@ For operational data: + `confd_cmd -o -fr -c "set /interfaces-state/interface{state_if_8}/type gigabitEthernet"` +NOTE: You can also test `STREAM` subscription in demo mode by passing configuration file. + +Start the server: + +`./src/confd_gnmi_server.py -t demo --logging=info --cfg=./data/demo.xml` + + + +And check with the client: + +`./src/confd_gnmi_client.py -o subscribe -s STREAM --read-count=10 --prefix /ietf-interfaces:interfaces --path interface[name=if_5]/name --path interface[name=if_5]/type` + + .subscribe for `list` entry [source, shell, role="acopy"] ---- @@ -1054,7 +1063,7 @@ TIP: To list-only tests, use `./test.sh --collect-only -q tests/` === Limitations and TODOs -The implementation of the adapter (still in early phase) is demo reference implementation that shows how to add gNMI support to existing ConfD interfaces. +The implementation of the adapter is a reference implementation that shows how to add gNMI support to existing ConfD and NSO interfaces. It can be extended according to deployment requirements. This not all gNMI functionality are currently supported. They may be added in the future. @@ -1094,7 +1103,7 @@ aggregation should not be used for Subscriptions by default) == Conclusion -gNMI Adapter Demo can provide initial gNMI functionality to ConfD. +gNMI Adapter can provide initial gNMI functionality to ConfD and NSO. == References diff --git a/gnmi-tools.xml b/gnmi-tools.xml index f6b3f80..ce37a14 100644 --- a/gnmi-tools.xml +++ b/gnmi-tools.xml @@ -54,5 +54,25 @@ opticalTransport In-Service + + + n1 + + + n2 + + + + n3 + + + + n4 + + test + 23 + + + diff --git a/gnmi-tools.yang b/gnmi-tools.yang index f02e3a1..2c8eeb2 100644 --- a/gnmi-tools.yang +++ b/gnmi-tools.yang @@ -39,6 +39,14 @@ module gnmi-tools { leaf type {type string;} leaf admin-state {type string;} } + + container top-d { + list top-d-list { + key name; + leaf name {type string;} + uses combo; + } + } } } diff --git a/src/confd_gnmi_adapter.py b/src/confd_gnmi_adapter.py index fd1b787..aa657cb 100644 --- a/src/confd_gnmi_adapter.py +++ b/src/confd_gnmi_adapter.py @@ -125,11 +125,10 @@ def add_path_for_monitoring(self, path, prefix): pass @abstractmethod - def get_monitored_changes(self) -> List: + def get_subscription_notifications(self): """ Get gNMI subscription updates for changed values - :return: gNMI update array - #TODO should we also return delete array + :return: gNMI Notification array """ pass @@ -261,15 +260,6 @@ def changes(self): log.debug("<== responses=%s", responses) return responses - def get_subscription_notifications(self): - update = self.get_monitored_changes() - notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), - prefix=self.subscription_list.prefix, - update=update, - delete=[], - atomic=False) - return [notif] - def _get_next_sample_interval_and_subscriptions(self, first_sample_time: int): interval = None diff --git a/src/confd_gnmi_api_adapter.py b/src/confd_gnmi_api_adapter.py index b3b9a7b..2526aec 100644 --- a/src/confd_gnmi_api_adapter.py +++ b/src/confd_gnmi_api_adapter.py @@ -146,9 +146,6 @@ def _get_subscription_notifications(self): yield prefix, updates, deletes self.change_db = [] - def get_monitored_changes(self): - raise NotImplementedError - def get_sample(self, path, prefix, allow_aggregation=False, start_change_processing=False): log.debug("==>") diff --git a/src/confd_gnmi_client.py b/src/confd_gnmi_client.py index aea6f16..85f5740 100755 --- a/src/confd_gnmi_client.py +++ b/src/confd_gnmi_client.py @@ -18,7 +18,7 @@ from confd_gnmi_common import HOST, PORT, make_xpath_path, VERSION, \ common_optparse_options, common_optparse_process, make_gnmi_path, \ datatype_str_to_int, subscription_mode_str_to_int, \ - encoding_int_to_str, encoding_str_to_int, get_time_string + encoding_int_to_str, encoding_str_to_int, get_time_string, get_timestamp_ns from gnmi_pb2_grpc import gNMIStub @@ -147,18 +147,21 @@ def print_notification(n): pfx_str = make_xpath_path(gnmi_prefix=n.prefix) print("timestamp {} prefix {} atomic {}".format( get_time_string(n.timestamp), pfx_str, n.atomic)) - print("Updates:") - for u in n.update: - if u.val.json_val: - value = json.loads(u.val.json_val) - elif u.val.json_ietf_val: - value = json.loads(u.val.json_ietf_val) - else: - value = str(u.val) - print("path: {} value {}".format(pfx_str + make_xpath_path(u.path), - value)) - for dpath in n.delete: - print("path deleted: {}".format(pfx_str + make_xpath_path(dpath))) + if n.update: + print("Updates:") + for u in n.update: + if u.val.json_val: + value = json.loads(u.val.json_val) + elif u.val.json_ietf_val: + value = json.loads(u.val.json_ietf_val) + else: + value = str(u.val) + print("path: {} value {}".format(pfx_str + make_xpath_path(u.path), + value)) + if n.delete: + print("Deletes:") + for dpath in n.delete: + print("path deleted: {}".format(pfx_str + make_xpath_path(dpath))) @staticmethod def read_subscribe_responses(responses, read_count=-1): @@ -169,7 +172,10 @@ def read_subscribe_responses(responses, read_count=-1): log.info("******* Subscription received response=%s read_count=%i", response, read_count) print("subscribe - response read_count={}".format(read_count)) - ConfDgNMIClient.print_notification(response.update) + if response.sync_response: + print(f"timestamp {get_time_string(get_timestamp_ns())} Sync_response") + else: + ConfDgNMIClient.print_notification(response.update) num_read += 1 if read_count > 0: read_count -= 1 diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index c4b22e6..0b1f7b4 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -5,6 +5,7 @@ import re import threading import xml.etree.ElementTree as ET +from abc import abstractmethod from enum import Enum from queue import Queue, Empty from random import randint @@ -15,9 +16,43 @@ log = logging.getLogger('confd_gnmi_demo_adapter') +# Types for streaming telemetry operations +class ChangeOp: + @abstractmethod + def __repr__(self): + return f'{self.__class__.__name__}()' + + @abstractmethod + def __str__(self): + return f'{self.__class__.__name__}()' + + +class ChangeVal(ChangeOp): + def __init__(self, value): + if not isinstance(value, str): + raise ValueError("Invalid value type") + self.value = value + + def __repr__(self): + return f'ChangeVal({self.value})' + + def __str__(self): + return str(self.value) + + +class ChangeDel(ChangeOp): + + def __init__(self, deleted_paths=[]): + self.deleted_paths = deleted_paths + def __repr__(self): + return f'ChangeDel()' + + def __str__(self): + return f'ChangeDel' class GnmiDemoServerAdapter(GnmiServerAdapter): NS_INTERFACES = "ietf-interfaces:" + NS_GNMI_TOOLS = "gnmi-tools:" NS_IANA = "iana-if-type:" # simple demo database @@ -47,7 +82,7 @@ class GnmiDemoServerAdapter(GnmiServerAdapter): ] def __init__(self): - self._fill_demo_db() + GnmiDemoServerAdapter.fill_demo_db() @staticmethod def load_config_string(xml_cfg): @@ -60,7 +95,7 @@ def load_config_string(xml_cfg): assert root.tag == "demo" changes = root.findall("./subscription/STREAM/changes/element") log.debug("changes=%s", changes) - if len(changes): + if changes: if "changes" not in GnmiDemoServerAdapter.config: GnmiDemoServerAdapter.config["changes"] = [] GnmiDemoServerAdapter.config["changes_idx"] = 0 @@ -68,12 +103,15 @@ def load_config_string(xml_cfg): log.debug("len(el)=%s", len(el)) if len(el): if len(el) == 2: - (path, val) = (el[0], el[1]) - log.debug("path.text=%s val.text=%s", path.text, - val.text) - GnmiDemoServerAdapter.config["changes"].append( - (GnmiDemoServerAdapter._nsless_xpath(path.text), - val.text.replace(GnmiDemoServerAdapter.NS_IANA, ""))) + path = GnmiDemoServerAdapter._nsless_xpath(el[0].text) + if (el[1].tag == "val"): + val = ChangeVal(el[1].text.replace(GnmiDemoServerAdapter.NS_IANA, "")) + elif (el[1].tag == "del"): + val=ChangeDel() + else: + assert False + log.debug("path=%s val=%s", path, val) + GnmiDemoServerAdapter.config["changes"].append((path, val)) elif len(el) == 0: log.debug("el.tag=%s", el.text) GnmiDemoServerAdapter.config["changes"].append(el.text) @@ -86,29 +124,49 @@ def get_adapter(cls): cls._instance = GnmiDemoServerAdapter() return cls._instance - def _fill_demo_db(self): + @classmethod + def fill_demo_db(cls, reset = False): log.debug("==>") - with self.db_lock: + + with cls.db_lock: + if reset: + cls.demo_db, cls.demo_state_db = {}, {} # make interfaces alphabetically sorted - ifs = sorted(str(i+1) for i in range(GnmiDemoServerAdapter.num_of_ifs)) + ifs = sorted( + str(i + 1) for i in range(cls.num_of_ifs)) for if_id in ifs: if_name = f"if_{if_id}" - state_if_name = "state_if_{}".format(if_id) - path = "/interfaces/interface[name={}]".format(if_name) - state_path = "/interfaces-state/interface[name={}]".format( - state_if_name) - self.demo_db["{}/name".format(path)] = if_name - self.demo_state_db["{}/name".format(state_path)] = state_if_name - self.demo_db["{}/type".format(path)] = "gigabitEthernet" - self.demo_state_db[ - "{}/type".format(state_path)] = "gigabitEthernet" - self.demo_db["{}/enabled".format(path)] = True - log.debug("<== self.demo_db=%s self.demo_state_db=%s", self.demo_db, - self.demo_state_db) + state_if_name = f"state_if_{if_id}" + path = f"/interfaces/interface[name={if_name}]" + state_path = f"/interfaces-state/interface[name={state_if_name}]" + cls.demo_db[f"{path}/name"] = if_name + cls.demo_state_db[f"{state_path}/name"] = state_if_name + cls.demo_db[f"{path}/type"] = "gigabitEthernet" + cls.demo_state_db[f"{state_path}/type"] = "gigabitEthernet" + cls.demo_db[f"{path}/enabled"] = True + + for i in range(1, 5): + path = f"/gnmi-tools/top-d/top-d-list[name=n{i}]" + cls.demo_db[f"{path}/name"] = f"n{i}" + if i == 2: + cls.demo_db[f"{path}/empty-leaf"] = [None] + if i == 3: + cls.demo_db[f"{path}/pres"] = {} + if i == 4: + cls.demo_db[f"{path}/down/str-leaf"] = "test" + cls.demo_db[f"{path}/down/int-leaf"] = "23" + + log.debug("<== self.demo_db=%s self.demo_state_db=%s", cls.demo_db, + cls.demo_state_db) + + @classmethod + def reset(cls): + cls.fill_demo_db(reset = True) @staticmethod def _nsless_xpath(xpath: str): - return xpath.replace(GnmiDemoServerAdapter.NS_INTERFACES, "") + return (xpath.replace(GnmiDemoServerAdapter.NS_INTERFACES, "") + .replace(GnmiDemoServerAdapter.NS_GNMI_TOOLS, "")) @staticmethod def _get_key_from_xpath(xpath): @@ -131,11 +189,9 @@ def _demo_db_to_key_elem_map(db): for p, v in db.items(): key = GnmiDemoServerAdapter._get_key_from_xpath(p) elem = GnmiDemoServerAdapter._get_elem_from_xpath(p) - elem_map = {} - if key in map_db: - elem_map = map_db[key] + elem_map = map_db.get(key, {}) if elem == "type": - v = "{}{}".format(GnmiDemoServerAdapter.NS_IANA, v) + v = f'{GnmiDemoServerAdapter.NS_IANA}{v}' elem_map[elem] = v map_db[key] = elem_map log.debug("<== map_db={}".format(map_db)) @@ -161,36 +217,59 @@ def get_sample(self, path, prefix, allow_aggregation=False) -> []: adapter: GnmiDemoServerAdapter = self.adapter with adapter.db_lock: updates = adapter.get_db_updates_for_path(path, prefix, - adapter.demo_db, + GnmiDemoServerAdapter.demo_db, allow_aggregation) # 'if' below is optimization if len(updates) == 0: updates = adapter.get_db_updates_for_path(path, prefix, - adapter.demo_state_db, + GnmiDemoServerAdapter.demo_state_db, allow_aggregation) log.debug("<== updates=%s", updates) return updates - def get_monitored_changes(self) -> []: + def get_subscription_notifications(self) -> []: + log.debug("==> self.change_db=%s", self.change_db) + assert len(self.change_db) > 0 + update = [] + delete = [] + + def remove_prefix(path, prefix): + if path.startswith(prefix): + return path[len(prefix):] + return path + with self.change_db_lock: - log.debug("==> self.change_db=%s", self.change_db) - assert len(self.change_db) > 0 - update = [] for c in self.change_db: prefix_str = self.adapter._nsless_xpath(make_xpath_path( gnmi_prefix=self.subscription_list.prefix)) - p = self.adapter._nsless_xpath(c[0]) - if p.startswith(prefix_str): - p = p[len(prefix_str):] - v = "{}{}".format(GnmiDemoServerAdapter.NS_IANA, c[1]) - json_val = gnmi_pb2.TypedValue( - json_ietf_val=json.dumps(v).encode()) - update.append(gnmi_pb2.Update(path=make_gnmi_path(p), - val=json_val)) + p = remove_prefix(self.adapter._nsless_xpath(c[0]), + prefix_str) + if isinstance(c[1], ChangeVal): + v = "{}{}".format(GnmiDemoServerAdapter.NS_IANA, c[1]) + json_val = gnmi_pb2.TypedValue( + json_ietf_val=json.dumps(v).encode()) + update.append(gnmi_pb2.Update(path=make_gnmi_path(p), + val=json_val)) + elif isinstance(c[1], ChangeDel): + delete.extend( + [make_gnmi_path(remove_prefix(p, prefix_str)) for p + in + [path for i, path in enumerate(c[1].deleted_paths) + if path not in c[1].deleted_paths[:i]]]) + else: + assert False self.change_db = [] - log.debug("<== update=%s", update) - return update + log.debug("<== update=%s delete=%s", update, delete) + + notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), + prefix=self.subscription_list.prefix, + update=update, + delete=delete, + atomic=False) + + log.debug("<== notif=%s", notif) + return [notif] def _get_random_changes(self): log.debug("==>") @@ -210,7 +289,7 @@ def _get_random_changes(self): new_val = "fastEther" log.debug("adding change path=%s, new_val=%s", path, new_val) - changes.append((path, new_val)) + changes.append((path, ChangeVal(new_val))) if randint(0, 9) < 8: changes.append("send") else: @@ -240,7 +319,7 @@ def _get_config_changes(): def process_changes(self): log.debug("==>") add_count = 0 - # there may be more changes to same path, we cannot use map + # there may be more changes to the same path, we cannot use map assert self.change_event_queue is not None assert len(self.change_db) == 0 while True: @@ -249,43 +328,8 @@ def process_changes(self): event = self.change_event_queue.get(timeout=1) log.debug("event=%s", event) if event == self.ChangeEvent.ADD: - # generate modifications and add them - with self.change_db_lock, self.adapter.db_lock: - if "changes" in GnmiDemoServerAdapter.config: - changes = self._get_config_changes() - else: - changes = self._get_random_changes() - send = False - for c in changes: - log.debug("processing change c=%s", c) - if isinstance(c, str): - assert c == "send" or c == "break" - if c == "send": - send = True - break - else: - log.info("c=%s self.monitored_paths=%s", - c, self.monitored_paths) - (path, val) = c - if path[0] != '/': - path = '/' + path - if any(path.startswith(elem) for elem in - self.monitored_paths): - log.info("appending (path, val)=%s", (path, val)) - self.change_db.append((path, val)) - if path in self.adapter.demo_db: - self.adapter.demo_db[path] = val - elif path in self.adapter.demo_state_db: - self.adapter.demo_state_db[ - path] = val - else: - assert False - if send: - if len(self.change_db): - self.change_event_queue.put( - self.ChangeEvent.SEND) + self._process_add_event() elif event == self.ChangeEvent.SEND: - # send all modified paths self.put_event(self.SubscriptionEvent.SEND_CHANGES) elif event == self.ChangeEvent.FINISH: break @@ -297,12 +341,66 @@ def process_changes(self): if add_count == 0: add_count += 1 # sometimes skip add if is first from previous - if randint(0, 1) == 0: - continue + if randint(0, 1) == 0: continue add_count = 0 # reset add count self.change_event_queue.put(self.ChangeEvent.ADD) log.debug("<==") + def _process_add_event(self): + log.debug("==>") + with self.change_db_lock, self.adapter.db_lock: + if "changes" in GnmiDemoServerAdapter.config: + changes = self._get_config_changes() + else: + changes = self._get_random_changes() + send = False + for c in changes: + log.debug("processing change c=%s", c) + if isinstance(c, str): + assert c == "send" or c == "break" + if c == "send": + send = True + break + else: + log.info("c=%s self.monitored_paths=%s", + c, self.monitored_paths) + (path, ch_op) = c + if path[0] != '/': + path = '/' + path + if any(path.startswith(elem) for elem in + self.monitored_paths): + log.info("appending (path, val)=%s", (path, ch_op)) + self.change_db.append( + self.update_demo_db(path, ch_op)) + if send: + if len(self.change_db): + self.change_event_queue.put( + self.ChangeEvent.SEND) + log.debug("<==") + + def update_demo_db(self, path, ch_op): + log.debug("==>") + ret = (path, ch_op) + if isinstance(ch_op, ChangeVal): + if path in self.adapter.demo_db: + self.adapter.demo_db[path] = ch_op + elif path in self.adapter.demo_state_db: + self.adapter.demo_state_db[ + path] = ch_op + else: + log.warning("path=%s not found, skipping update (val=%s)", path, ch_op) + elif isinstance(ch_op, ChangeDel): + for db in [self.adapter.demo_db, + self.adapter.demo_state_db]: + for db_path in list(db.keys()): + if db_path.startswith(path): + del db[db_path] + if "gnmi-tools/top-d" in db_path: + a_path = db_path.split(']')[0] + ch_op.deleted_paths.append(a_path + ']') + log.debug("<== ret=%s", ret) + return ret + def add_path_for_monitoring(self, path, prefix): log.debug("==>") @@ -373,7 +471,9 @@ def get_db_updates_for_path(self, path, prefix, db, allow_aggregation=False): path_val_list = [] map_db = self._demo_db_to_key_elem_map(db) ifaces = self._nsless_xpath(path_with_prefix) - if allow_aggregation and ifaces == "/interfaces" or ifaces == "/interfaces-state": + if allow_aggregation and (ifaces == "/interfaces" or ifaces == "/interfaces-state"): + # for now, only interface elements supported for aggregation + map_db = {key: value for key, value in map_db.items() if "if_" in key} if list(db.keys())[0].startswith(ifaces): path_val_list = [ (path_with_prefix, {"interface": list(map_db.values())})] @@ -421,11 +521,12 @@ def get_updates(self, path, prefix, data_type): with self.db_lock: if data_type == gnmi_pb2.GetRequest.DataType.CONFIG or \ data_type == gnmi_pb2.GetRequest.DataType.ALL: - updates = self.get_db_updates_for_path(path, prefix, self.demo_db, + updates = self.get_db_updates_for_path(path, prefix, + GnmiDemoServerAdapter.demo_db, allow_aggregation=True) if data_type != gnmi_pb2.GetRequest.DataType.CONFIG: updates = self.get_db_updates_for_path(path, prefix, - self.demo_state_db, + GnmiDemoServerAdapter.demo_state_db, allow_aggregation=True) log.debug("<== updates=%s", updates) @@ -448,18 +549,19 @@ def get(self, prefix, paths, data_type, use_models): @contextmanager def update_transaction(self, prefix): - yield DemoTransaction(prefix) + yield DemoTransaction(prefix, GnmiDemoServerAdapter.demo_db) class DemoTransaction(UpdateTransaction): - def __init__(self, prefix): + def __init__(self, prefix, demo_db): self.prefix = prefix + self.demo_db = demo_db def set_update(self, path, val): log.info("==> path=%s, val=%s", path, val) path_str = make_xpath_path(path, self.prefix) op = gnmi_pb2.UpdateResult.INVALID - if GnmiDemoServerAdapter._nsless_xpath(path_str) in GnmiDemoServerAdapter.demo_db: + if GnmiDemoServerAdapter._nsless_xpath(path_str) in self.demo_db: if val.string_val: str_val = val.string_val elif val.json_ietf_val: @@ -471,7 +573,7 @@ def set_update(self, path, val): str_val = "{}".format(val) str_val = str_val.replace(GnmiDemoServerAdapter.NS_IANA, "") with GnmiDemoServerAdapter.db_lock: - GnmiDemoServerAdapter.demo_db[path_str] = str_val + self.demo_db[path_str] = str_val op = gnmi_pb2.UpdateResult.UPDATE log.info("==> op=%s", op) diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index c9952e6..ff44925 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -1,3 +1,5 @@ +from abc import abstractmethod + import itertools import json import subprocess @@ -11,8 +13,8 @@ import gnmi_pb2 from confd_gnmi_client import ConfDgNMIClient from confd_gnmi_common import make_gnmi_path, datatype_str_to_int, \ - make_formatted_path, get_timestamp_ns -from confd_gnmi_demo_adapter import GnmiDemoServerAdapter + make_formatted_path, get_timestamp_ns, add_path_prefix +from confd_gnmi_demo_adapter import GnmiDemoServerAdapter, ChangeDel, ChangeVal from confd_gnmi_servicer import AdapterType, ConfDgNMIServicer from utils.utils import log, nodeid_to_path @@ -22,12 +24,18 @@ class GrpcBase: NS_IETF_INTERFACES = GnmiDemoServerAdapter.NS_INTERFACES NS_OC_INTERFACES = "openconfig-interfaces:" + NS_GNMI_TOOLS = GnmiDemoServerAdapter.NS_GNMI_TOOLS PREFIX_MAP = { NS_IETF_INTERFACES: "if:", - NS_OC_INTERFACES: "oc-if:" + NS_OC_INTERFACES: "oc-if:", + NS_GNMI_TOOLS: NS_GNMI_TOOLS } + @abstractmethod + def set_adapter_type(self): + raise NotImplementedError + @pytest.fixture def fix_method(self, request): log.debug("==> fixture method setup request={}".format(request)) @@ -37,6 +45,7 @@ def fix_method(self, request): self.set_adapter_type() self.server = ConfDgNMIServicer.serve(adapter_type=self.adapter_type, insecure=True) self.client = ConfDgNMIClient(insecure=True) + GnmiDemoServerAdapter.fill_demo_db() log.debug("<== fixture method setup") yield log.debug("==> fixture method teardown (nodeid %s)" % nodeid_path) @@ -105,7 +114,9 @@ def capability_supported(cap): assert gnmi_pb2.Encoding.JSON_IETF in capabilities.supported_encodings @staticmethod - def assert_update(update, path_val): + def assert_update(update, path_val, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): """ Asserts that the update matches the expected path and value. @@ -125,7 +136,8 @@ def assert_update(update, path_val): # Validate the path if required if check_path: - assert update.path == path_val[0] + assert (add_path_prefix(update.path, update_prefix) == + add_path_prefix(path_val[0], pv_prefix)) # Parse the json_ietf_val attribute of the update object json_value = json.loads(update.val.json_ietf_val) @@ -134,27 +146,39 @@ def assert_update(update, path_val): assert json_value == path_val[1] @staticmethod - def assert_set_response(response, path_op): - assert (response.path == path_op[0]) + def assert_set_response(response, path_op, + response_prefix=gnmi_pb2.Path(), + set_prefix=gnmi_pb2.Path()): + assert (add_path_prefix(response.path, response_prefix) == + add_path_prefix(path_op[0], set_prefix)) assert (response.op == path_op[1]) @staticmethod - def assert_updates(updates, path_vals): + def assert_updates(updates, path_vals, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): assert (len(updates) == len(path_vals)) for i, u in enumerate(updates): - AdapterTests.assert_update(u, path_vals[i]) + AdapterTests.assert_update(u, path_vals[i], update_prefix, pv_prefix) @staticmethod - def assert_one_in_update(updates, pv): - assert any(u.path == pv[0] and json.loads(u.val.json_ietf_val) == pv[1] - for u in updates) + def assert_one_in_update(updates, pv, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): + assert any( + add_path_prefix(u.path, update_prefix) == add_path_prefix(pv[0], pv_prefix) + and json.loads(u.val.json_ietf_val) == pv[1] + for u in updates + ) @staticmethod - def assert_in_updates(updates, path_vals): + def assert_in_updates(updates, path_vals, + update_prefix=gnmi_pb2.Path(), + pv_prefix=gnmi_pb2.Path()): log.debug("==> updates=%s path_vals=%s", updates, path_vals) assert (len(updates) == len(path_vals)) for pv in path_vals: - AdapterTests.assert_one_in_update(updates, pv) + AdapterTests.assert_one_in_update(updates, pv, update_prefix, pv_prefix) log.debug("<==") @@ -174,23 +198,27 @@ def verify_get_response_updates(self, prefix, paths, path_value, if prefix: assert (n.prefix == prefix) assert(time_before <= n.timestamp and n.timestamp <= time_after) - assert_fun(n.update, path_value) - - def verify_sub_sub_response_updates(self, prefix, paths, path_value, - assert_fun=None, - subscription_mode=gnmi_pb2.SubscriptionList.ONCE, - poll_interval=0, - poll_count=0, read_count=-1, - sample_interval = None, - encoding=gnmi_pb2.Encoding.JSON_IETF, - allow_aggregation=True): + assert_fun(n.update, path_value, n.prefix, prefix) + + def subscribe_and_verify_response( + self, prefix, paths, path_value, delete_paths=[], + assert_fun=None, + subscription_mode=gnmi_pb2.SubscriptionList.ONCE, + poll_interval=0, poll_count=0, read_count=-1, + sample_interval = None, + encoding=gnmi_pb2.Encoding.JSON_IETF, + allow_aggregation=True + ): ''' Invoke subscription and verify received updates - :param prefix: gNMI prefix for subscription - :param paths: gNMI path for subscription - :param path_value: array of tuples of expected (path,value) for each response - path is gNMI path, val is response value (in json) - :param assert_fun: function to verify updates according to path_value + :param prefix: gNMI prefix for the subscription + :param paths: gNMI paths for the subscription + :param path_value: one array or array of arrays of tuples + of expected (path, value) in update field + of responses, val is response value (in json) + :param delete_paths: array or array of array of paths in delete field of responses + :param assert_fun: function to verify updates in one response according + to current path_value element :param subscription_mode: :param poll_interval: interval between polls (for gnmi_pb2.SubscriptionList.POLL only) :param poll_count: number of polls (for gnmi_pb2.SubscriptionList.POLL only) @@ -203,29 +231,32 @@ def verify_sub_sub_response_updates(self, prefix, paths, path_value, if assert_fun is None: assert_fun = AdapterTests.assert_updates - stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE + stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE if sample_interval is not None: assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE - log.debug("paths=%s path_value=%s", paths, path_value) + log.debug("paths=%s path_value=%s delete_paths=%s", + paths, path_value, delete_paths) response_count = 0 - pv_idx = 0 - for pv in path_value: - if not isinstance(pv, list): - pv_idx = -1 - break - log.debug("pv_idx=%s", pv_idx) + + def init_idx(vals): + idx = -1 if not vals or any( + not isinstance(v, list) for v in vals) else 0 + return idx + pv_idx = init_idx(path_value) + del_idx = init_idx(delete_paths) + log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx) def read_subscribe_responses(responses, read_count=-1): - nonlocal response_count, pv_idx + nonlocal response_count, pv_idx, del_idx prev_response_time_ms = 0 SAMPLE_THRESHOLD = 300 for response in responses: time_after = get_timestamp_ns() log.debug("response=%s response_count=%i time_after=%i", response, response_count, time_after) - response_time_ms = time_after/1000000 + response_time_ms = time_after / 1000000 if response.sync_response: log.debug("sync_response") assert response_count == 1 # sync expected only after first response @@ -234,7 +265,11 @@ def read_subscribe_responses(responses, read_count=-1): assert (time_before <= response.update.timestamp and response.update.timestamp <= time_after) if prefix: - assert (response.update.prefix == prefix) + prefix_str = make_formatted_path(prefix) + prefix_update_str = make_formatted_path(response.update.prefix) + assert (prefix_update_str.startswith(prefix_str) + or prefix_update_str.startswith(prefix_str)) + pv_to_check = path_value if pv_idx != -1: assert pv_idx < len(path_value) @@ -242,6 +277,18 @@ def read_subscribe_responses(responses, read_count=-1): pv_idx += 1 if len(pv_to_check) > 0: # skip empty arrays assert_fun(response.update.update, pv_to_check) + + del_to_check = delete_paths + if del_idx != -1: + assert del_idx < len(delete_paths) + del_to_check = delete_paths[del_idx] + del_idx += 1 + if len(del_to_check) > 0: + assert (len(response.update.delete) == len(del_to_check)) + for i, d in enumerate(response.update.delete): + assert (add_path_prefix(d, response.update.prefix) + == add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete + log.debug("response_count=%i pv_idx=%i", response_count, pv_idx) if sample_interval and response_count > 1: assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and ( @@ -258,25 +305,30 @@ def read_subscribe_responses(responses, read_count=-1): assert read_count == -1 or read_count == 0 read_fun = read_subscribe_responses - subscription_list = \ - ConfDgNMIClient.make_subscription_list(prefix, - paths, - subscription_mode, - encoding, - stream_mode = stream_mode, - sample_interval_ms=sample_interval, - allow_aggregation=allow_aggregation) + subscription_list = ConfDgNMIClient.make_subscription_list( + prefix, + paths, + subscription_mode, + encoding, + stream_mode = stream_mode, + sample_interval_ms=sample_interval, + allow_aggregation=allow_aggregation + ) time_before = get_timestamp_ns() - responses = self.client.subscribe(subscription_list, - read_fun=read_fun, - poll_interval=poll_interval, - poll_count=poll_count, - read_count=read_count) + responses = self.client.subscribe( + subscription_list, + read_fun=read_fun, + poll_interval=poll_interval, + poll_count=poll_count, + read_count=read_count + ) log.debug("responses=%s", responses) if poll_count: - assert (poll_count + 1 == response_count) + assert poll_count + 1 == response_count + + def _test_get_subscribe(self, is_subscribe=False, datatype=gnmi_pb2.GetRequest.DataType.CONFIG, @@ -289,12 +341,11 @@ def _test_get_subscribe(self, is_subscribe=False, kwargs = {"assert_fun": AdapterTests.assert_updates} if_state_str = prefix_state_str = "" - db = GnmiDemoServerAdapter.get_adapter().demo_db + db = GnmiDemoServerAdapter.demo_db if datatype == gnmi_pb2.GetRequest.DataType.STATE: prefix_state_str = "-state" if_state_str = "state_" db = GnmiDemoServerAdapter.demo_state_db - map_db = GnmiDemoServerAdapter._demo_db_to_key_elem_map(db) prefix = make_gnmi_path("/ietf-interfaces:interfaces{}".format(prefix_state_str)) kwargs["prefix"] = prefix if_id = 8 @@ -311,7 +362,7 @@ def _test_get_subscribe(self, is_subscribe=False, ifname = "{}if_{}".format(if_state_str, if_id) if is_subscribe: - verify_response_updates = self.verify_sub_sub_response_updates + verify_response_updates = self.subscribe_and_verify_response kwargs["subscription_mode"] = subscription_mode kwargs["poll_interval"] = poll_interval kwargs["poll_count"] = poll_count @@ -371,12 +422,15 @@ def _test_get_subscribe(self, is_subscribe=False, kwargs["assert_fun"] = AdapterTests.assert_in_updates verify_response_updates(**kwargs) if allow_aggregation: + map_db = GnmiDemoServerAdapter._demo_db_to_key_elem_map(db) + # remove non interface related entries + map_db = {key: value for key, value in map_db.items() if "if_" in key} kwargs["paths"] = [list_paths[2]] if allow_aggregation: kwargs["path_value"] = [(list_paths[2], {"interface": list(map_db.values())})] kwargs["assert_fun"] = None - kwargs["prefix"] = None + kwargs["prefix"] = gnmi_pb2.Path() verify_response_updates(**kwargs) @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"]) @@ -454,9 +508,16 @@ def confd_cmd_subprocess(confd_cmd): def format_command(c): path = make_gnmi_path(c[0]) - return "mset {} {}".format( - make_formatted_path(path, gnmi_prefix=path_prefix), - c[1].split(":")[-1]) # remove json prefix + if isinstance(c[1], (str, ChangeVal)): + cmd = "mset {} {}".format( + make_formatted_path(path, gnmi_prefix=path_prefix), + c[1].split(":")[-1]) # remove json prefix + elif isinstance(c[1], ChangeDel): + cmd = "mdel {}".format( + make_formatted_path(path, gnmi_prefix=path_prefix)) + else: + raise TypeError(f"Invalid value type: {type(c[1])}") + return cmd for send, changes in itertools.groupby(changes_list, lambda c: c == "send"): if not send: @@ -464,24 +525,33 @@ def format_command(c): sleep(1) @staticmethod - def _changes_list_to_pv(changes_list): + def _changes_list_to_pv_del(changes_list): ''' - Return path_value_list created from changes_list. + Return pv_res and delete_res lists created from changes_list. :param changes_list: - :return: + :return: (pv_res, delete_res) ''' - path_value = [] - pv_idx = 0 + pv_res, del_res = [], [] + pv_candidates, del_candidates = [], [] for c in changes_list: - if isinstance(c, str): + if type(c) is str: if c == "send": - pv_idx += 1 + pv_res.append(pv_candidates) + del_res.append(del_candidates) + pv_candidates, del_candidates = [], [] else: - if len(path_value) < pv_idx + 1: - path_value.append([]) - path_value[pv_idx].append((make_gnmi_path(c[0]), c[1])) - log.debug("path_value=%s", path_value) - return path_value + if isinstance(c[1], (str, int)) or isinstance(c[1], ChangeVal): + pv_candidates.append((make_gnmi_path(c[0]), c[1])) + elif isinstance(c[1], ChangeDel): + if c[1].deleted_paths: + for p in c[1].deleted_paths: + del_candidates.append(make_gnmi_path(p)) + else: + del_candidates.append(make_gnmi_path(c[0])) + else: + raise TypeError(f"Invalid value type: {type(c[1])}") + log.debug("pv_res=%s delete_res=%s", pv_res, del_res) + return pv_res, del_res @staticmethod def _changes_list_to_xml(changes_list, prefix_str): @@ -496,7 +566,12 @@ def _changes_list_to_xml(changes_list, prefix_str): else: ET.SubElement(el, "path").text = "{}/{}".format(prefix_str, c[0]) - ET.SubElement(el, "val").text = c[1] + if isinstance(c[1], str) or isinstance(c[1], ChangeVal): + ET.SubElement(el, "val").text = c[1] + elif isinstance(c[1], ChangeDel): + ET.SubElement(el, "del") + else: + raise TypeError(f"Invalid value type: {type(c[1])}") xml_str = ET.tostring(demo, encoding='unicode') log.debug("xml_str=%s", xml_str) return xml_str @@ -505,7 +580,7 @@ def _changes_list_to_xml(changes_list, prefix_str): @pytest.mark.parametrize("data_type", ["CONFIG", "STATE"]) def test_subscribe_stream(self, request, data_type): log.info("testing subscribe_stream") - if_state_str = prefix_state_str = "" + if_state_str, prefix_state_str = "", "" if data_type == "STATE": prefix_state_str = "-state" if_state_str = "state_" @@ -530,15 +605,38 @@ def test_subscribe_stream(self, request, data_type): self._test_subscribe(prefix_str, self.NS_IETF_INTERFACES, paths, changes_list) + @pytest.mark.usefixtures("reset_cfg") + def test_subscribe_stream_delete(self, request): + log.info("testing subscribe_stream_delete") + + changes_list = [ + ("top-d", ChangeDel(deleted_paths=[ + "/top-d/top-d-list[name=n1]", + "/top-d/top-d-list[name=n2]", + "/top-d/top-d-list[name=n3]", + "/top-d/top-d-list[name=n4]", + ])), + "send", + ] + log.info("change_list=%s", changes_list) + + prefix_str = "{prefix}gnmi-tools" + paths = [make_gnmi_path("top-d")] + self._test_subscribe(prefix_str, "gnmi-tools:", + paths, changes_list) + def _test_subscribe(self, prefix_str, ns_prefix, paths, changes_list): - path_value = [[]] # empty element means no check - path_value.extend(self._changes_list_to_pv(changes_list)) + path_value, delete = [[]], [[]] # empty element means no check - skip first response + pv, de = self._changes_list_to_pv_del(changes_list) + path_value.extend(pv) + delete.extend(de) prefix = make_gnmi_path("/" + prefix_str.format(prefix=ns_prefix)) kwargs = {"assert_fun": AdapterTests.assert_in_updates} kwargs["prefix"] = prefix kwargs["paths"] = paths kwargs["path_value"] = path_value + kwargs["delete_paths"] = delete kwargs["subscription_mode"] = gnmi_pb2.SubscriptionList.STREAM kwargs["read_count"] = len(path_value) kwargs["assert_fun"] = AdapterTests.assert_in_updates @@ -548,13 +646,16 @@ def _test_subscribe(self, prefix_str, ns_prefix, paths, changes_list): GnmiDemoServerAdapter.load_config_string( self._changes_list_to_xml(changes_list, prefix_pfx)) if self.adapter_type == AdapterType.API: - prefix_pfx = prefix_str.format(prefix=self.PREFIX_MAP[ns_prefix]) + if ns_prefix in self.PREFIX_MAP: + prefix_pfx = prefix_str.format(prefix=self.PREFIX_MAP[ns_prefix]) + else: + prefix_pfx = prefix_str.format(prefix='') thr = threading.Thread( target=self._send_change_list_to_confd_thread, args=(prefix_pfx, changes_list,)) thr.start() - self.verify_sub_sub_response_updates(**kwargs) + self.subscribe_and_verify_response(**kwargs) if self.adapter_type == AdapterType.API: thr.join() @@ -581,7 +682,8 @@ def test_set(self, request): assert (time_before <= response.timestamp and response.timestamp <= time_after) assert (response.prefix == prefix) AdapterTests.assert_set_response(response.response[0], - (paths[0], gnmi_pb2.UpdateResult.UPDATE)) + (paths[0], gnmi_pb2.UpdateResult.UPDATE), + response.prefix, prefix) # fetch with get and see value has changed datatype = gnmi_pb2.GetRequest.DataType.CONFIG @@ -590,13 +692,15 @@ def test_set(self, request): for n in get_response.notification: log.debug("n=%s", n) assert (n.prefix == prefix) - AdapterTests.assert_updates(n.update, [(paths[0], "iana-if-type:fastEther")]) + AdapterTests.assert_updates(n.update, [(paths[0], "iana-if-type:fastEther")], + n.prefix, prefix) # put value back vals = [gnmi_pb2.TypedValue(json_ietf_val=b"\"iana-if-type:gigabitEthernet\"")] response = self.client.set(prefix, list(zip(paths, vals))) AdapterTests.assert_set_response(response.response[0], - (paths[0], gnmi_pb2.UpdateResult.UPDATE)) + (paths[0], gnmi_pb2.UpdateResult.UPDATE), + response.prefix, prefix) @pytest.mark.usefixtures("reset_cfg") def test_set_encoding(self, request): @@ -630,4 +734,5 @@ def test_it(encoding): json_ietf_val=b"\"iana-if-type:gigabitEthernet\"")] response = self.client.set(prefix, list(zip(paths, vals))) AdapterTests.assert_set_response(response.response[0], - (paths[0], gnmi_pb2.UpdateResult.UPDATE)) + (paths[0], gnmi_pb2.UpdateResult.UPDATE), + response.prefix, prefix) diff --git a/tests/test_client_server_confd.py b/tests/test_client_server_confd.py index 333942c..1736331 100644 --- a/tests/test_client_server_confd.py +++ b/tests/test_client_server_confd.py @@ -103,7 +103,7 @@ def _test_gnmi_tools_get_subscribe_gnmi_tools(self, is_subscribe=False, self.leaf_paths_str_for_gnmi_tools] if is_subscribe: - verify_response_updates = self.verify_sub_sub_response_updates + verify_response_updates = self.subscribe_and_verify_response kwargs["subscription_mode"] = subscription_mode kwargs["poll_interval"] = poll_interval kwargs["poll_count"] = poll_count @@ -182,7 +182,7 @@ def test_subscribe_stream_on_change_api_state(self, request): "send", ] path_value = [[]] # empty element means no check - path_value.extend(self._changes_list_to_pv(changes_list)) + path_value.extend(self._changes_list_to_pv_del(changes_list)[0]) prefix_str = "" prefix = make_gnmi_path(prefix_str) @@ -208,7 +208,7 @@ def test_subscribe_stream_on_change_api_state(self, request): change_thread.start() try: - self.verify_sub_sub_response_updates(**kwargs) + self.subscribe_and_verify_response(**kwargs) sleep(1) finally: diff --git a/tests/test_client_server_demo.py b/tests/test_client_server_demo.py index f6fa0f5..f98a130 100644 --- a/tests/test_client_server_demo.py +++ b/tests/test_client_server_demo.py @@ -2,6 +2,7 @@ from client_server_test_base import AdapterTests from confd_gnmi_server import AdapterType +from confd_gnmi_demo_adapter import GnmiDemoServerAdapter _confd_DEBUG = 1 @@ -16,3 +17,4 @@ def set_adapter_type(self): def _do_reset_cfg(self): yield + GnmiDemoServerAdapter.reset()