diff --git a/src/confd_gnmi_adapter.py b/src/confd_gnmi_adapter.py index cf3af06..9d54f06 100644 --- a/src/confd_gnmi_adapter.py +++ b/src/confd_gnmi_adapter.py @@ -8,6 +8,7 @@ from typing import List import gnmi_pb2 +from confd_gnmi_common import get_timestamp_ns log = logging.getLogger('confd_gnmi_adapter') @@ -210,7 +211,6 @@ def sample(self, subscriptions): :param: start_monitoring: if True, the paths will be monitored for future changes TODO `delete` is processed and `delete` array is empty - TODO timestamp is 0 :return: SubscribeResponse with sample """ log.debug("==> subscriptions=%s", subscriptions) @@ -220,7 +220,7 @@ def sample(self, subscriptions): update.extend(self.get_sample(path=s.path, prefix=self.subscription_list.prefix, allow_aggregation=self.subscription_list.allow_aggregation)) - notif = gnmi_pb2.Notification(timestamp=0, + notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=self.subscription_list.prefix, update=update, delete=[], @@ -245,7 +245,6 @@ def changes(self): Get subscription responses for changes (subscribed values). `update` arrays contain changes TODO `delete` is processed and `delete` array is empty - TODO timestamp is 0 :return: SubscribeResponse with changes """ log.debug("==>") @@ -257,7 +256,7 @@ def changes(self): def get_subscription_notifications(self): update = self.get_monitored_changes() - notif = gnmi_pb2.Notification(timestamp=0, + notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=self.subscription_list.prefix, update=update, delete=[], diff --git a/src/confd_gnmi_api_adapter.py b/src/confd_gnmi_api_adapter.py index 0851fb1..1640eeb 100644 --- a/src/confd_gnmi_api_adapter.py +++ b/src/confd_gnmi_api_adapter.py @@ -12,7 +12,8 @@ from confd_gnmi_adapter import GnmiServerAdapter from confd_gnmi_api_adapter_defaults import ApiAdapterDefaults from confd_gnmi_common import make_xpath_path, make_formatted_path, \ - add_path_prefix, remove_path_prefix, make_gnmi_path, parse_instance_path + add_path_prefix, remove_path_prefix, make_gnmi_path, parse_instance_path, \ + get_timestamp_ns import tm _tm = __import__(tm.TM) @@ -111,7 +112,7 @@ def __init__(self, adapter, subscription_list): self.subpoint_paths = {} def get_subscription_notifications(self): - return [gnmi_pb2.Notification(timestamp=0, + return [gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=prefix, update=updates, delete=[], @@ -665,7 +666,7 @@ def get(self, prefix, paths, data_type, use_models): updates = [gnmi_pb2.Update(path=remove_path_prefix(update.path, prefix), val=update.val) for u_list in updates2 for update in u_list] - notif = gnmi_pb2.Notification(timestamp=1, prefix=prefix, + notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=prefix, update=updates, delete=[], atomic=True) diff --git a/src/confd_gnmi_client.py b/src/confd_gnmi_client.py index fc65151..ac2f866 100755 --- a/src/confd_gnmi_client.py +++ b/src/confd_gnmi_client.py @@ -18,7 +18,7 @@ from confd_gnmi_common import HOST, PORT, make_xpath_path, VERSION, \ common_optparse_options, common_optparse_process, make_gnmi_path, \ datatype_str_to_int, subscription_mode_str_to_int, \ - encoding_int_to_str, encoding_str_to_int + encoding_int_to_str, encoding_str_to_int, get_time_string from gnmi_pb2_grpc import gNMIStub @@ -145,8 +145,8 @@ def generate_subscriptions(subscription_list, poll_interval=0.0, @staticmethod def print_notification(n): pfx_str = make_xpath_path(gnmi_prefix=n.prefix) - print("timestamp {} prefix {} atomic {}".format(n.timestamp, pfx_str, - n.atomic)) + print("timestamp {} prefix {} atomic {}".format( + get_time_string(n.timestamp), pfx_str, n.atomic)) print("Updates:") for u in n.update: if u.val.json_val: @@ -403,9 +403,9 @@ def parse_args(args): else: response = client.delete(prefix, paths) print("Set - UpdateResult:") - print("timestamp {} prefix {}".format(response.timestamp, - make_xpath_path( - response.prefix))) + print("timestamp {} prefix {}".format( + get_time_string(response.timestamp), + make_xpath_path(response.prefix))) for r in response.response: print("timestamp {} op {} path {}".format(r.timestamp, r.op, diff --git a/src/confd_gnmi_common.py b/src/confd_gnmi_common.py index d36bac5..4f59761 100644 --- a/src/confd_gnmi_common.py +++ b/src/confd_gnmi_common.py @@ -1,4 +1,4 @@ -from enum import Enum +import time import logging from typing import Tuple, Dict, List, Iterable import re @@ -277,3 +277,23 @@ def stream_mode_str_to_int(mode: str, no_error=False) -> int: return _convert_enum_format(gnmi_pb2.SubscriptionMode.Value, mode, f'Unknown streaming mode! ({mode})', no_error, f'UNKNOWN({mode})') + + +def get_timestamp_ns() -> int: + """ + Get the current timestamp in nanoseconds. + Returns: + int: The current timestamp in nanoseconds. + """ + return int(time.time_ns()) + +def get_time_string(time_ns) -> str: + """ + Get the formatted timestamp string. + Args: + time_ns (int): The timestamp in nanoseconds. + Returns: + str: The formatted timestamp string. + """ + utc = time.gmtime(time_ns // 1000000000) + return f"{utc.tm_year}-{utc.tm_mon}-{utc.tm_mday} {utc.tm_hour}:{utc.tm_min}.{utc.tm_sec} +{time_ns % 1000000000}ns UTC" diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index 3f4d1a5..90cf504 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -10,7 +10,7 @@ import gnmi_pb2 from confd_gnmi_adapter import GnmiServerAdapter -from confd_gnmi_common import make_xpath_path, make_gnmi_path +from confd_gnmi_common import make_xpath_path, make_gnmi_path, get_timestamp_ns log = logging.getLogger('confd_gnmi_demo_adapter') @@ -434,7 +434,7 @@ def get(self, prefix, paths, data_type, use_models): update = [] for path in paths: update.extend(self.get_updates(path, prefix, data_type)) - notif = gnmi_pb2.Notification(timestamp=1, prefix=prefix, + notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=prefix, update=update, delete=[], atomic=True) diff --git a/src/confd_gnmi_servicer.py b/src/confd_gnmi_servicer.py index 406c749..0c7ed74 100755 --- a/src/confd_gnmi_servicer.py +++ b/src/confd_gnmi_servicer.py @@ -6,7 +6,7 @@ import grpc import gnmi_pb2 -from confd_gnmi_common import PORT +from confd_gnmi_common import PORT, get_timestamp_ns from gnmi_pb2_grpc import gNMIServicer, add_gNMIServicer_to_server log = logging.getLogger('confd_gnmi_servicer') @@ -170,11 +170,12 @@ def Set(self, request, context): ops = adapter.set(request.prefix, request.update) ops += adapter.delete(request.prefix, request.delete) - results = [gnmi_pb2.UpdateResult(timestamp=0, path=path, op=op) + # Note: UpdateResult timestamp is deprecated, setting to -1 + results = [gnmi_pb2.UpdateResult(timestamp=-1, path=path, op=op) for path, op in ops] response = gnmi_pb2.SetResponse(prefix=request.prefix, - response=results, timestamp=0) + response=results, timestamp=get_timestamp_ns()) log.info("<== response=%s", response) return response diff --git a/tests/client_server_test_base.py b/tests/client_server_test_base.py index 6f52d8a..ad6c0fe 100644 --- a/tests/client_server_test_base.py +++ b/tests/client_server_test_base.py @@ -1,7 +1,6 @@ import json import subprocess import threading -import time import xml.etree.cElementTree as ET from time import sleep @@ -11,7 +10,7 @@ import gnmi_pb2 from confd_gnmi_client import ConfDgNMIClient from confd_gnmi_common import make_gnmi_path, datatype_str_to_int, \ - make_formatted_path + make_formatted_path, get_timestamp_ns from confd_gnmi_demo_adapter import GnmiDemoServerAdapter from confd_gnmi_servicer import AdapterType, ConfDgNMIServicer from utils.utils import log, nodeid_to_path @@ -141,18 +140,23 @@ def assert_in_updates(updates, path_vals): GrpcBase.assert_one_in_update(updates, pv) log.debug("<==") + def verify_get_response_updates(self, prefix, paths, path_value, datatype, encoding, assert_fun=None): if assert_fun is None: assert_fun = GrpcBase.assert_updates log.debug("prefix=%s paths=%s pv_list=%s datatype=%s encoding=%s", prefix, paths, path_value, datatype, encoding) + time_before = get_timestamp_ns() get_response = self.client.get(prefix, paths, datatype, encoding) - log.debug("notification=%s", get_response.notification) + time_after = get_timestamp_ns() + log.debug("notification=%s time_before=%i time_after=%i", + get_response.notification, time_before, time_after) for n in get_response.notification: log.debug("n=%s", n) if prefix: assert (n.prefix == prefix) + assert(time_before <= n.timestamp and n.timestamp <= time_after) assert_fun(n.update, path_value) def verify_sub_sub_response_updates(self, prefix, paths, path_value, @@ -201,14 +205,17 @@ def read_subscribe_responses(responses, read_count=-1): prev_response_time_ms = 0 SAMPLE_THRESHOLD = 300 for response in responses: - log.debug("response=%s response_count=%i", response, - response_count) - response_time_ms = time.time_ns()/1000000 + time_after = get_timestamp_ns() + log.debug("response=%s response_count=%i time_after=%i", response, + response_count, time_after) + response_time_ms = time_after/1000000 if response.sync_response: log.debug("sync_response") assert response_count == 1 # sync expected only after first response else: response_count += 1 + assert (time_before <= response.update.timestamp and + response.update.timestamp <= time_after) if prefix: assert (response.update.prefix == prefix) pv_to_check = path_value @@ -243,6 +250,7 @@ def read_subscribe_responses(responses, read_count=-1): sample_interval_ms=sample_interval, allow_aggregation=allow_aggregation) + time_before = get_timestamp_ns() responses = self.client.subscribe(subscription_list, read_fun=read_fun, poll_interval=poll_interval, @@ -573,7 +581,10 @@ def test_set(self, request): prefix = make_gnmi_path("/ietf-interfaces:interfaces") paths = [GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1], "", if_id)] vals = [gnmi_pb2.TypedValue(json_ietf_val=b"\"iana-if-type:fastEther\"")] + time_before = get_timestamp_ns() response = self.client.set(prefix, list(zip(paths, vals))) + time_after = get_timestamp_ns() + assert (time_before <= response.timestamp and response.timestamp <= time_after) assert (response.prefix == prefix) GrpcBase.assert_set_response(response.response[0], (paths[0], gnmi_pb2.UpdateResult.UPDATE))