diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index 26be7fe..345ee45 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -16,6 +16,7 @@ log = logging.getLogger('confd_gnmi_demo_adapter') + # Types for streaming telemetry operations class ChangeOp: @abstractmethod @@ -45,10 +46,10 @@ class ChangeDel(ChangeOp): def __init__(self, deleted_paths=[]): self.deleted_paths = deleted_paths def __repr__(self): - return f'ChangeDel()' + return 'ChangeDel' def __str__(self): - return f'ChangeDel' + return 'ChangeDel' class GnmiDemoServerAdapter(GnmiServerAdapter): NS_INTERFACES = "ietf-interfaces:" @@ -395,7 +396,7 @@ def update_demo_db(self, path, ch_op): del db[db_path] if "gnmi-tools/top-d" in db_path: a_path = db_path.split(']')[0] - ch_op.deleted_paths.append(a_path + ']') + ch_op.deleted_paths.append(a_path + ']') log.debug("<== ret=%s", ret) return ret diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index 6ae36da..96f56d3 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -1,3 +1,4 @@ +import functools from abc import abstractmethod import itertools @@ -181,7 +182,6 @@ def assert_in_updates(updates, path_vals, AdapterTests.assert_one_in_update(updates, pv, update_prefix, pv_prefix) log.debug("<==") - def verify_get_response_updates(self, prefix, paths, path_value, datatype, encoding, assert_fun=None): if assert_fun is None: @@ -200,6 +200,94 @@ def verify_get_response_updates(self, prefix, paths, path_value, assert(time_before <= n.timestamp and n.timestamp <= time_after) assert_fun(n.update, path_value, n.prefix, prefix) + def read_subscribe_responses(self, responses, read_count, prefix, poll_count, + assert_fun, sample_interval, path_value, + delete_paths, time_before): + """ + Iterates the subscribe responses and performs various assertions and checks + Params: + :param responses: The subscribe responses. + :param read_count (int): The number of responses to read. + :param prefix: The prefix for the response update. + :param poll_count (int): The number of poll counts. + :param assert_fun (function): The assertion function to check the response update. + :param sample_interval (int): The interval between samples. + :param path_value (list): The list of path values to check. + :param delete_paths (list): The list of delete paths to check. + :param time_before (int): The timestamp before the response update. + + Returns: + None + + Raises: + AssertionError: If any assertion fails. + """ + + def init_idx(vals): + idx = -1 if not vals or any( + not isinstance(v, list) for v in vals) else 0 + return idx + + response_count = 0 + pv_idx = init_idx(path_value) + del_idx = init_idx(delete_paths) + log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx) + prev_response_time_ms = 0 + SAMPLE_THRESHOLD = 1000 + for response in responses: + time_after = get_timestamp_ns() + log.debug("response=%s response_count=%i time_after=%i", response, + response_count, time_after) + response_time_ms = time_after / 1000000 + if response.sync_response: + log.debug("sync_response") + assert response_count == 1 # sync expected only after first response + else: + response_count += 1 + assert (time_before <= response.update.timestamp and + response.update.timestamp <= time_after) + if prefix: + prefix_str = make_formatted_path(prefix) + prefix_update_str = make_formatted_path(response.update.prefix) + assert (prefix_update_str.startswith(prefix_str) + or prefix_update_str.startswith(prefix_str)) + + pv_to_check = path_value + if pv_idx != -1: + assert pv_idx < len(path_value) + pv_to_check = path_value[pv_idx] + pv_idx += 1 + if len(pv_to_check) > 0: # skip empty arrays + assert_fun(response.update.update, pv_to_check) + + del_to_check = delete_paths + if del_idx != -1: + assert del_idx < len(delete_paths) + del_to_check = delete_paths[del_idx] + del_idx += 1 + if len(del_to_check) > 0: + assert (len(response.update.delete) == len(del_to_check)) + for i, d in enumerate(response.update.delete): + assert (add_path_prefix(d, response.update.prefix) + == add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete + + log.debug("response_count=%i pv_idx=%i", response_count, pv_idx) + if sample_interval and response_count > 1: + assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and ( + response_time_ms < (prev_response_time_ms + sample_interval + SAMPLE_THRESHOLD)) + + if read_count > 0: + read_count -= 1 + if read_count == 0: + log.info("read count reached") + break + prev_response_time_ms = response_time_ms + log.debug("Getting next response. read_count=%s response_count=%s", + read_count, response_count) + assert read_count == -1 or read_count == 0 + if poll_count: + assert poll_count + 1 == response_count + def subscribe_and_verify_response( self, prefix, paths, path_value, delete_paths=[], assert_fun=None, @@ -209,7 +297,7 @@ def subscribe_and_verify_response( encoding=gnmi_pb2.Encoding.JSON_IETF, allow_aggregation=True ): - ''' + """ Invoke subscription and verify received updates :param prefix: gNMI prefix for the subscription :param paths: gNMI paths for the subscription @@ -227,84 +315,18 @@ def subscribe_and_verify_response( :param sample_interval: interval for sample in ms (for gnmi_pb2.SubscriptionList.STREAM only) :param encoding: encoding to use (implemented only JSON_IETF) :param allow_aggregation: allow aggregation of results in updates - ''' + """ if assert_fun is None: assert_fun = AdapterTests.assert_updates stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE if sample_interval is not None: - assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM - stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE + assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM + stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE log.debug("paths=%s path_value=%s delete_paths=%s", paths, path_value, delete_paths) - response_count = 0 - def init_idx(vals): - idx = -1 if not vals or any( - not isinstance(v, list) for v in vals) else 0 - return idx - pv_idx = init_idx(path_value) - del_idx = init_idx(delete_paths) - log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx) - - def read_subscribe_responses(responses, read_count=-1): - nonlocal response_count, pv_idx, del_idx - prev_response_time_ms = 0 - SAMPLE_THRESHOLD = 300 - for response in responses: - time_after = get_timestamp_ns() - log.debug("response=%s response_count=%i time_after=%i", response, - response_count, time_after) - response_time_ms = time_after / 1000000 - if response.sync_response: - log.debug("sync_response") - assert response_count == 1 # sync expected only after first response - else: - response_count += 1 - assert (time_before <= response.update.timestamp and - response.update.timestamp <= time_after) - if prefix: - prefix_str = make_formatted_path(prefix) - prefix_update_str = make_formatted_path(response.update.prefix) - assert (prefix_update_str.startswith(prefix_str) - or prefix_update_str.startswith(prefix_str)) - - pv_to_check = path_value - if pv_idx != -1: - assert pv_idx < len(path_value) - pv_to_check = path_value[pv_idx] - pv_idx += 1 - if len(pv_to_check) > 0: # skip empty arrays - assert_fun(response.update.update, pv_to_check) - - del_to_check = delete_paths - if del_idx != -1: - assert del_idx < len(delete_paths) - del_to_check = delete_paths[del_idx] - del_idx += 1 - if len(del_to_check) > 0: - assert (len(response.update.delete) == len(del_to_check)) - for i, d in enumerate(response.update.delete): - assert (add_path_prefix(d, response.update.prefix) - == add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete - - log.debug("response_count=%i pv_idx=%i", response_count, pv_idx) - if sample_interval and response_count > 1: - assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and ( - response_time_ms < (prev_response_time_ms + sample_interval + SAMPLE_THRESHOLD)) - - if read_count > 0: - read_count -= 1 - if read_count == 0: - log.info("read count reached") - break - prev_response_time_ms = response_time_ms - log.debug("Getting next response. read_count=%s response_count=%s", - read_count, response_count) - assert read_count == -1 or read_count == 0 - - read_fun = read_subscribe_responses subscription_list = ConfDgNMIClient.make_subscription_list( prefix, paths, @@ -315,20 +337,21 @@ def read_subscribe_responses(responses, read_count=-1): allow_aggregation=allow_aggregation ) - time_before = get_timestamp_ns() responses = self.client.subscribe( subscription_list, - read_fun=read_fun, + read_fun=functools.partial(self.read_subscribe_responses, + prefix=prefix, poll_count=poll_count, + assert_fun=assert_fun, + sample_interval=sample_interval, + path_value=path_value, + delete_paths=delete_paths, + time_before = get_timestamp_ns()), poll_interval=poll_interval, poll_count=poll_count, read_count=read_count ) log.debug("responses=%s", responses) - if poll_count: - assert poll_count + 1 == response_count - - def _test_get_subscribe(self, is_subscribe=False, datatype=gnmi_pb2.GetRequest.DataType.CONFIG,