Skip to content

Commit

Permalink
review comments 3
Browse files Browse the repository at this point in the history
Signed-off-by: Michal Novak <[email protected]>
  • Loading branch information
micnovak committed Jan 10, 2024
1 parent 254f6e9 commit 354fde9
Show file tree
Hide file tree
Showing 2 changed files with 104 additions and 80 deletions.
7 changes: 4 additions & 3 deletions src/confd_gnmi_demo_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

log = logging.getLogger('confd_gnmi_demo_adapter')


# Types for streaming telemetry operations
class ChangeOp:
@abstractmethod
Expand Down Expand Up @@ -45,10 +46,10 @@ class ChangeDel(ChangeOp):
def __init__(self, deleted_paths=[]):
self.deleted_paths = deleted_paths
def __repr__(self):
return f'ChangeDel()'
return 'ChangeDel'

def __str__(self):
return f'ChangeDel'
return 'ChangeDel'

class GnmiDemoServerAdapter(GnmiServerAdapter):
NS_INTERFACES = "ietf-interfaces:"
Expand Down Expand Up @@ -395,7 +396,7 @@ def update_demo_db(self, path, ch_op):
del db[db_path]
if "gnmi-tools/top-d" in db_path:
a_path = db_path.split(']')[0]
ch_op.deleted_paths.append(a_path + ']')
ch_op.deleted_paths.append(a_path + ']')
log.debug("<== ret=%s", ret)
return ret

Expand Down
177 changes: 100 additions & 77 deletions tests/client_server_test_base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
from abc import abstractmethod

import itertools
Expand Down Expand Up @@ -181,7 +182,6 @@ def assert_in_updates(updates, path_vals,
AdapterTests.assert_one_in_update(updates, pv, update_prefix, pv_prefix)
log.debug("<==")


def verify_get_response_updates(self, prefix, paths, path_value,
datatype, encoding, assert_fun=None):
if assert_fun is None:
Expand All @@ -200,6 +200,94 @@ def verify_get_response_updates(self, prefix, paths, path_value,
assert(time_before <= n.timestamp and n.timestamp <= time_after)
assert_fun(n.update, path_value, n.prefix, prefix)

def read_subscribe_responses(self, responses, read_count, prefix, poll_count,
assert_fun, sample_interval, path_value,
delete_paths, time_before):
"""
Iterates the subscribe responses and performs various assertions and checks
Params:
:param responses: The subscribe responses.
:param read_count (int): The number of responses to read.
:param prefix: The prefix for the response update.
:param poll_count (int): The number of poll counts.
:param assert_fun (function): The assertion function to check the response update.
:param sample_interval (int): The interval between samples.
:param path_value (list): The list of path values to check.
:param delete_paths (list): The list of delete paths to check.
:param time_before (int): The timestamp before the response update.
Returns:
None
Raises:
AssertionError: If any assertion fails.
"""

def init_idx(vals):
idx = -1 if not vals or any(
not isinstance(v, list) for v in vals) else 0
return idx

response_count = 0
pv_idx = init_idx(path_value)
del_idx = init_idx(delete_paths)
log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx)
prev_response_time_ms = 0
SAMPLE_THRESHOLD = 1000
for response in responses:
time_after = get_timestamp_ns()
log.debug("response=%s response_count=%i time_after=%i", response,
response_count, time_after)
response_time_ms = time_after / 1000000
if response.sync_response:
log.debug("sync_response")
assert response_count == 1 # sync expected only after first response
else:
response_count += 1
assert (time_before <= response.update.timestamp and
response.update.timestamp <= time_after)
if prefix:
prefix_str = make_formatted_path(prefix)
prefix_update_str = make_formatted_path(response.update.prefix)
assert (prefix_update_str.startswith(prefix_str)
or prefix_update_str.startswith(prefix_str))

pv_to_check = path_value
if pv_idx != -1:
assert pv_idx < len(path_value)
pv_to_check = path_value[pv_idx]
pv_idx += 1
if len(pv_to_check) > 0: # skip empty arrays
assert_fun(response.update.update, pv_to_check)

del_to_check = delete_paths
if del_idx != -1:
assert del_idx < len(delete_paths)
del_to_check = delete_paths[del_idx]
del_idx += 1
if len(del_to_check) > 0:
assert (len(response.update.delete) == len(del_to_check))
for i, d in enumerate(response.update.delete):
assert (add_path_prefix(d, response.update.prefix)
== add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete

log.debug("response_count=%i pv_idx=%i", response_count, pv_idx)
if sample_interval and response_count > 1:
assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and (
response_time_ms < (prev_response_time_ms + sample_interval + SAMPLE_THRESHOLD))

if read_count > 0:
read_count -= 1
if read_count == 0:
log.info("read count reached")
break
prev_response_time_ms = response_time_ms
log.debug("Getting next response. read_count=%s response_count=%s",
read_count, response_count)
assert read_count == -1 or read_count == 0
if poll_count:
assert poll_count + 1 == response_count

def subscribe_and_verify_response(
self, prefix, paths, path_value, delete_paths=[],
assert_fun=None,
Expand All @@ -209,7 +297,7 @@ def subscribe_and_verify_response(
encoding=gnmi_pb2.Encoding.JSON_IETF,
allow_aggregation=True
):
'''
"""
Invoke subscription and verify received updates
:param prefix: gNMI prefix for the subscription
:param paths: gNMI paths for the subscription
Expand All @@ -227,84 +315,18 @@ def subscribe_and_verify_response(
:param sample_interval: interval for sample in ms (for gnmi_pb2.SubscriptionList.STREAM only)
:param encoding: encoding to use (implemented only JSON_IETF)
:param allow_aggregation: allow aggregation of results in updates
'''
"""
if assert_fun is None:
assert_fun = AdapterTests.assert_updates

stream_mode = gnmi_pb2.SubscriptionMode.ON_CHANGE
if sample_interval is not None:
assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM
stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE
assert subscription_mode == gnmi_pb2.SubscriptionList.STREAM
stream_mode = gnmi_pb2.SubscriptionMode.SAMPLE

log.debug("paths=%s path_value=%s delete_paths=%s",
paths, path_value, delete_paths)
response_count = 0

def init_idx(vals):
idx = -1 if not vals or any(
not isinstance(v, list) for v in vals) else 0
return idx
pv_idx = init_idx(path_value)
del_idx = init_idx(delete_paths)
log.debug("pv_idx=%s del_idx=%s", pv_idx, del_idx)

def read_subscribe_responses(responses, read_count=-1):
nonlocal response_count, pv_idx, del_idx
prev_response_time_ms = 0
SAMPLE_THRESHOLD = 300
for response in responses:
time_after = get_timestamp_ns()
log.debug("response=%s response_count=%i time_after=%i", response,
response_count, time_after)
response_time_ms = time_after / 1000000
if response.sync_response:
log.debug("sync_response")
assert response_count == 1 # sync expected only after first response
else:
response_count += 1
assert (time_before <= response.update.timestamp and
response.update.timestamp <= time_after)
if prefix:
prefix_str = make_formatted_path(prefix)
prefix_update_str = make_formatted_path(response.update.prefix)
assert (prefix_update_str.startswith(prefix_str)
or prefix_update_str.startswith(prefix_str))

pv_to_check = path_value
if pv_idx != -1:
assert pv_idx < len(path_value)
pv_to_check = path_value[pv_idx]
pv_idx += 1
if len(pv_to_check) > 0: # skip empty arrays
assert_fun(response.update.update, pv_to_check)

del_to_check = delete_paths
if del_idx != -1:
assert del_idx < len(delete_paths)
del_to_check = delete_paths[del_idx]
del_idx += 1
if len(del_to_check) > 0:
assert (len(response.update.delete) == len(del_to_check))
for i, d in enumerate(response.update.delete):
assert (add_path_prefix(d, response.update.prefix)
== add_path_prefix(del_to_check[i], prefix)) # TODO do we need check one_in_delete

log.debug("response_count=%i pv_idx=%i", response_count, pv_idx)
if sample_interval and response_count > 1:
assert (response_time_ms > (prev_response_time_ms + sample_interval - SAMPLE_THRESHOLD)) and (
response_time_ms < (prev_response_time_ms + sample_interval + SAMPLE_THRESHOLD))

if read_count > 0:
read_count -= 1
if read_count == 0:
log.info("read count reached")
break
prev_response_time_ms = response_time_ms
log.debug("Getting next response. read_count=%s response_count=%s",
read_count, response_count)
assert read_count == -1 or read_count == 0

read_fun = read_subscribe_responses
subscription_list = ConfDgNMIClient.make_subscription_list(
prefix,
paths,
Expand All @@ -315,20 +337,21 @@ def read_subscribe_responses(responses, read_count=-1):
allow_aggregation=allow_aggregation
)

time_before = get_timestamp_ns()
responses = self.client.subscribe(
subscription_list,
read_fun=read_fun,
read_fun=functools.partial(self.read_subscribe_responses,
prefix=prefix, poll_count=poll_count,
assert_fun=assert_fun,
sample_interval=sample_interval,
path_value=path_value,
delete_paths=delete_paths,
time_before = get_timestamp_ns()),
poll_interval=poll_interval,
poll_count=poll_count,
read_count=read_count
)

log.debug("responses=%s", responses)
if poll_count:
assert poll_count + 1 == response_count



def _test_get_subscribe(self, is_subscribe=False,
datatype=gnmi_pb2.GetRequest.DataType.CONFIG,
Expand Down

0 comments on commit 354fde9

Please sign in to comment.