Skip to content

Commit

Permalink
Merge pull request #21 from ConfD-Developer/timestamp
Browse files Browse the repository at this point in the history
implementation of timestamp support + tests
  • Loading branch information
micnovak authored Oct 17, 2023
2 parents fbfee31 + e5eca18 commit 143ede2
Show file tree
Hide file tree
Showing 7 changed files with 57 additions and 25 deletions.
7 changes: 3 additions & 4 deletions src/confd_gnmi_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from typing import List

import gnmi_pb2
from confd_gnmi_common import get_timestamp_ns

log = logging.getLogger('confd_gnmi_adapter')

Expand Down Expand Up @@ -210,7 +211,6 @@ def sample(self, subscriptions):
:param: start_monitoring: if True, the paths will be monitored
for future changes
TODO `delete` is processed and `delete` array is empty
TODO timestamp is 0
:return: SubscribeResponse with sample
"""
log.debug("==> subscriptions=%s", subscriptions)
Expand All @@ -220,7 +220,7 @@ def sample(self, subscriptions):
update.extend(self.get_sample(path=s.path,
prefix=self.subscription_list.prefix,
allow_aggregation=self.subscription_list.allow_aggregation))
notif = gnmi_pb2.Notification(timestamp=0,
notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(),
prefix=self.subscription_list.prefix,
update=update,
delete=[],
Expand All @@ -245,7 +245,6 @@ def changes(self):
Get subscription responses for changes (subscribed values).
`update` arrays contain changes
TODO `delete` is processed and `delete` array is empty
TODO timestamp is 0
:return: SubscribeResponse with changes
"""
log.debug("==>")
Expand All @@ -257,7 +256,7 @@ def changes(self):

def get_subscription_notifications(self):
update = self.get_monitored_changes()
notif = gnmi_pb2.Notification(timestamp=0,
notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(),
prefix=self.subscription_list.prefix,
update=update,
delete=[],
Expand Down
7 changes: 4 additions & 3 deletions src/confd_gnmi_api_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
from confd_gnmi_adapter import GnmiServerAdapter
from confd_gnmi_api_adapter_defaults import ApiAdapterDefaults
from confd_gnmi_common import make_xpath_path, make_formatted_path, \
add_path_prefix, remove_path_prefix, make_gnmi_path, parse_instance_path
add_path_prefix, remove_path_prefix, make_gnmi_path, parse_instance_path, \
get_timestamp_ns

import tm
_tm = __import__(tm.TM)
Expand Down Expand Up @@ -111,7 +112,7 @@ def __init__(self, adapter, subscription_list):
self.subpoint_paths = {}

def get_subscription_notifications(self):
return [gnmi_pb2.Notification(timestamp=0,
return [gnmi_pb2.Notification(timestamp=get_timestamp_ns(),
prefix=prefix,
update=updates,
delete=[],
Expand Down Expand Up @@ -665,7 +666,7 @@ def get(self, prefix, paths, data_type, use_models):
updates = [gnmi_pb2.Update(path=remove_path_prefix(update.path, prefix), val=update.val)
for u_list in updates2
for update in u_list]
notif = gnmi_pb2.Notification(timestamp=1, prefix=prefix,
notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=prefix,
update=updates,
delete=[],
atomic=True)
Expand Down
12 changes: 6 additions & 6 deletions src/confd_gnmi_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
from confd_gnmi_common import HOST, PORT, make_xpath_path, VERSION, \
common_optparse_options, common_optparse_process, make_gnmi_path, \
datatype_str_to_int, subscription_mode_str_to_int, \
encoding_int_to_str, encoding_str_to_int
encoding_int_to_str, encoding_str_to_int, get_time_string

from gnmi_pb2_grpc import gNMIStub

Expand Down Expand Up @@ -145,8 +145,8 @@ def generate_subscriptions(subscription_list, poll_interval=0.0,
@staticmethod
def print_notification(n):
pfx_str = make_xpath_path(gnmi_prefix=n.prefix)
print("timestamp {} prefix {} atomic {}".format(n.timestamp, pfx_str,
n.atomic))
print("timestamp {} prefix {} atomic {}".format(
get_time_string(n.timestamp), pfx_str, n.atomic))
print("Updates:")
for u in n.update:
if u.val.json_val:
Expand Down Expand Up @@ -403,9 +403,9 @@ def parse_args(args):
else:
response = client.delete(prefix, paths)
print("Set - UpdateResult:")
print("timestamp {} prefix {}".format(response.timestamp,
make_xpath_path(
response.prefix)))
print("timestamp {} prefix {}".format(
get_time_string(response.timestamp),
make_xpath_path(response.prefix)))
for r in response.response:
print("timestamp {} op {} path {}".format(r.timestamp,
r.op,
Expand Down
22 changes: 21 additions & 1 deletion src/confd_gnmi_common.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from enum import Enum
import time
import logging
from typing import Tuple, Dict, List, Iterable
import re
Expand Down Expand Up @@ -277,3 +277,23 @@ def stream_mode_str_to_int(mode: str, no_error=False) -> int:
return _convert_enum_format(gnmi_pb2.SubscriptionMode.Value, mode,
f'Unknown streaming mode! ({mode})',
no_error, f'UNKNOWN({mode})')


def get_timestamp_ns() -> int:
"""
Get the current timestamp in nanoseconds.
Returns:
int: The current timestamp in nanoseconds.
"""
return int(time.time_ns())

def get_time_string(time_ns) -> str:
"""
Get the formatted timestamp string.
Args:
time_ns (int): The timestamp in nanoseconds.
Returns:
str: The formatted timestamp string.
"""
utc = time.gmtime(time_ns // 1000000000)
return f"{utc.tm_year}-{utc.tm_mon}-{utc.tm_mday} {utc.tm_hour}:{utc.tm_min}.{utc.tm_sec} +{time_ns % 1000000000}ns UTC"
4 changes: 2 additions & 2 deletions src/confd_gnmi_demo_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import gnmi_pb2
from confd_gnmi_adapter import GnmiServerAdapter
from confd_gnmi_common import make_xpath_path, make_gnmi_path
from confd_gnmi_common import make_xpath_path, make_gnmi_path, get_timestamp_ns

log = logging.getLogger('confd_gnmi_demo_adapter')

Expand Down Expand Up @@ -434,7 +434,7 @@ def get(self, prefix, paths, data_type, use_models):
update = []
for path in paths:
update.extend(self.get_updates(path, prefix, data_type))
notif = gnmi_pb2.Notification(timestamp=1, prefix=prefix,
notif = gnmi_pb2.Notification(timestamp=get_timestamp_ns(), prefix=prefix,
update=update,
delete=[],
atomic=True)
Expand Down
7 changes: 4 additions & 3 deletions src/confd_gnmi_servicer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import grpc

import gnmi_pb2
from confd_gnmi_common import PORT
from confd_gnmi_common import PORT, get_timestamp_ns
from gnmi_pb2_grpc import gNMIServicer, add_gNMIServicer_to_server

log = logging.getLogger('confd_gnmi_servicer')
Expand Down Expand Up @@ -170,11 +170,12 @@ def Set(self, request, context):
ops = adapter.set(request.prefix, request.update)
ops += adapter.delete(request.prefix, request.delete)

results = [gnmi_pb2.UpdateResult(timestamp=0, path=path, op=op)
# Note: UpdateResult timestamp is deprecated, setting to -1
results = [gnmi_pb2.UpdateResult(timestamp=-1, path=path, op=op)
for path, op in ops]

response = gnmi_pb2.SetResponse(prefix=request.prefix,
response=results, timestamp=0)
response=results, timestamp=get_timestamp_ns())

log.info("<== response=%s", response)
return response
Expand Down
23 changes: 17 additions & 6 deletions tests/client_server_test_base.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import subprocess
import threading
import time
import xml.etree.cElementTree as ET
from time import sleep

Expand All @@ -11,7 +10,7 @@
import gnmi_pb2
from confd_gnmi_client import ConfDgNMIClient
from confd_gnmi_common import make_gnmi_path, datatype_str_to_int, \
make_formatted_path
make_formatted_path, get_timestamp_ns
from confd_gnmi_demo_adapter import GnmiDemoServerAdapter
from confd_gnmi_servicer import AdapterType, ConfDgNMIServicer
from utils.utils import log, nodeid_to_path
Expand Down Expand Up @@ -141,18 +140,23 @@ def assert_in_updates(updates, path_vals):
GrpcBase.assert_one_in_update(updates, pv)
log.debug("<==")


def verify_get_response_updates(self, prefix, paths, path_value,
datatype, encoding, assert_fun=None):
if assert_fun is None:
assert_fun = GrpcBase.assert_updates
log.debug("prefix=%s paths=%s pv_list=%s datatype=%s encoding=%s",
prefix, paths, path_value, datatype, encoding)
time_before = get_timestamp_ns()
get_response = self.client.get(prefix, paths, datatype, encoding)
log.debug("notification=%s", get_response.notification)
time_after = get_timestamp_ns()
log.debug("notification=%s time_before=%i time_after=%i",
get_response.notification, time_before, time_after)
for n in get_response.notification:
log.debug("n=%s", n)
if prefix:
assert (n.prefix == prefix)
assert(time_before <= n.timestamp and n.timestamp <= time_after)
assert_fun(n.update, path_value)

def verify_sub_sub_response_updates(self, prefix, paths, path_value,
Expand Down Expand Up @@ -201,14 +205,17 @@ def read_subscribe_responses(responses, read_count=-1):
prev_response_time_ms = 0
SAMPLE_THRESHOLD = 300
for response in responses:
log.debug("response=%s response_count=%i", response,
response_count)
response_time_ms = time.time_ns()/1000000
time_after = get_timestamp_ns()
log.debug("response=%s response_count=%i time_after=%i", response,
response_count, time_after)
response_time_ms = time_after/1000000
if response.sync_response:
log.debug("sync_response")
assert response_count == 1 # sync expected only after first response
else:
response_count += 1
assert (time_before <= response.update.timestamp and
response.update.timestamp <= time_after)
if prefix:
assert (response.update.prefix == prefix)
pv_to_check = path_value
Expand Down Expand Up @@ -243,6 +250,7 @@ def read_subscribe_responses(responses, read_count=-1):
sample_interval_ms=sample_interval,
allow_aggregation=allow_aggregation)

time_before = get_timestamp_ns()
responses = self.client.subscribe(subscription_list,
read_fun=read_fun,
poll_interval=poll_interval,
Expand Down Expand Up @@ -573,7 +581,10 @@ def test_set(self, request):
prefix = make_gnmi_path("/ietf-interfaces:interfaces")
paths = [GrpcBase.mk_gnmi_if_path(self.leaf_paths_str[1], "", if_id)]
vals = [gnmi_pb2.TypedValue(json_ietf_val=b"\"iana-if-type:fastEther\"")]
time_before = get_timestamp_ns()
response = self.client.set(prefix, list(zip(paths, vals)))
time_after = get_timestamp_ns()
assert (time_before <= response.timestamp and response.timestamp <= time_after)
assert (response.prefix == prefix)
GrpcBase.assert_set_response(response.response[0],
(paths[0], gnmi_pb2.UpdateResult.UPDATE))
Expand Down

0 comments on commit 143ede2

Please sign in to comment.