From dfd500c0182457ce4c51726f043e0370b6bac2e5 Mon Sep 17 00:00:00 2001 From: Martin Volf Date: Tue, 12 Dec 2023 16:42:28 +0100 Subject: [PATCH] transactions wrap whole SetRequest processing fixes #28 Signed-off-by: Martin Volf --- src/confd_gnmi_adapter.py | 47 +++++++++++++++----------- src/confd_gnmi_api_adapter.py | 56 ++++++++++++++++--------------- src/confd_gnmi_client.py | 13 +++---- src/confd_gnmi_demo_adapter.py | 36 +++++++++++++------- src/confd_gnmi_servicer.py | 5 ++- tests/test_client_server_confd.py | 21 ++++++++++++ 6 files changed, 109 insertions(+), 69 deletions(-) diff --git a/src/confd_gnmi_adapter.py b/src/confd_gnmi_adapter.py index 9d54f06..fd1b787 100644 --- a/src/confd_gnmi_adapter.py +++ b/src/confd_gnmi_adapter.py @@ -5,7 +5,7 @@ from dataclasses import dataclass from enum import Enum from queue import Queue, Empty -from typing import List +from typing import List, ContextManager import gnmi_pb2 from confd_gnmi_common import get_timestamp_ns @@ -13,6 +13,27 @@ log = logging.getLogger('confd_gnmi_adapter') +class UpdateTransaction(ABC): + @abstractmethod + def update(self, path, value): + """ + Apply given update. + :param path: gNMI path + :param value: TypedValue to be applied to the path + :return: gNMI UpdateResult operation + """ + pass + + @abstractmethod + def delete(self, path): + """ + Delete value(s) for given single path. + :param path: gNMI path to delete + :return: gNMI UpdateResult operation + """ + pass + + class GnmiServerAdapter(ABC): @dataclass class CapabilityModel: @@ -56,27 +77,13 @@ def get(self, prefix, paths, data_type, use_models): pass @abstractmethod - def set(self, prefix, updates): - """ - Apply given updates. - :param prefix: gNMI path prefix - :param updates: gNMI updates (with path and val) to be set - :return: gNMI UpdateResult operation - """ - pass - - @abstractmethod - def delete(self, prefix, paths): + def update_transaction(self, prefix) -> ContextManager[UpdateTransaction]: """ - Delete value(s) for given path - TODO this is simple version for initial implementation - To reflect fully gNMI Set, - we should pass all delete, replace and update lists - :param prefix: gNMI path prefix - :param paths: list of gNMI paths to delete - :return: gNMI UpdateResult operation + A transaction context manager. Start a transaction and + yield a transaction handler to be used for subsequent calls + to update methods. """ - pass + ... class SubscriptionHandler(ABC): diff --git a/src/confd_gnmi_api_adapter.py b/src/confd_gnmi_api_adapter.py index 41b2646..373e4c6 100644 --- a/src/confd_gnmi_api_adapter.py +++ b/src/confd_gnmi_api_adapter.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager import logging import functools import itertools @@ -6,11 +7,12 @@ import select import sys import threading +import typing as t import json from enum import Enum from socket import socket import gnmi_pb2 -from confd_gnmi_adapter import GnmiServerAdapter +from confd_gnmi_adapter import GnmiServerAdapter, UpdateTransaction from confd_gnmi_api_adapter_defaults import ApiAdapterDefaults from confd_gnmi_common import make_xpath_path, make_formatted_path, \ add_path_prefix, remove_path_prefix, make_gnmi_path, parse_instance_path, \ @@ -692,36 +694,20 @@ def get(self, prefix, paths, data_type, use_models): log.info("<== notifications=%s", notifications) return notifications - def set(self, prefix, updates): - log.info("==> prefix=%s, updates=%s", prefix, updates) + @contextmanager + def update_transaction(self, prefix) -> t.Iterator["ApiTransaction"]: + log.info("==> prefix=%s", prefix) context = "netconf" groups = [self.username] with maapi.single_write_trans(self.username, context, groups, - ip=self.addr, port=self.port) as t: - updater = Updater(self, t, prefix) - ops = [(up.path, updater.update(up.path, up.val)) for up in updates] - t.apply() - - log.info("==> ops=%s", ops) - return ops - - def delete(self, prefix, paths): - log.info("==> prefix=%s, paths=%s", prefix, paths) - context = "netconf" - groups = [self.username] - with maapi.single_write_trans(self.username, context, groups, ip=self.addr, - port=self.port) as t: - ops = [] - for path in paths: - t.delete(self.fix_path_prefixes(make_formatted_path(path, prefix))) - ops.append((path, gnmi_pb2.UpdateResult.DELETE)) - t.apply() - - log.info("==> ops=%s", ops) - return ops + ip=self.addr, port=self.port) as trans: + at = ApiTransaction(self, trans, prefix) + yield at + trans.apply() + log.info("<==") -class Updater: +class ApiTransaction(UpdateTransaction): """Update message handler. One instance is suppposed to handle all Update messages in one SetRequest. """ @@ -730,7 +716,7 @@ def __init__(self, adapter, trans, prefix): self.trans = trans self.prefix = prefix - def update(self, path, value): + def apply_update(self, path, value): if value.HasField('json_ietf_val'): obj = json.loads(value.json_ietf_val) elif value.HasField('json_val'): @@ -754,3 +740,19 @@ def build_obj(self, obj, elem): assert isinstance(obj, dict), 'cannot apply keys to a non-container' obj.update(elem.key) return {elem.name: obj} + + def update(self, updates): + log.debug("==> updates=%s", updates) + ops = [(up.path, self.apply_update(up.path, up.val)) for up in updates] + log.debug("<== ops=%s", ops) + return ops + + def delete(self, paths): + def do_delete(path): + gpath = self.adapter.fix_path_prefixes(make_formatted_path(path, self.prefix)) + self.trans.delete(gpath) + return (path, gnmi_pb2.UpdateResult.DELETE) + log.debug("==> paths=%s", paths) + ops = [do_delete(path) for path in paths] + log.debug("<== ops=%s", ops) + return ops diff --git a/src/confd_gnmi_client.py b/src/confd_gnmi_client.py index a17d768..aea6f16 100755 --- a/src/confd_gnmi_client.py +++ b/src/confd_gnmi_client.py @@ -239,20 +239,21 @@ def set(self, prefix, path_vals): for pv in path_vals: up = gnmi_pb2.Update(path=pv[0], val=pv[1]) update.append(up) - request = gnmi_pb2.SetRequest(prefix=prefix, update=update) - response = logged_rpc_call("Set", request, - lambda: self.stub.Set(request, metadata=self.metadata)) + response = self.set_request(prefix, update=update) log.info("<== response=%s", response) return response def delete(self, prefix, paths): log.info("==> prefix=%s paths=%s", prefix, paths) - request = gnmi_pb2.SetRequest(prefix=prefix, delete=paths) - response = logged_rpc_call("(delete) Set", request, - lambda: self.stub.Set(request, metadata=self.metadata)) + response = self.set_request(prefix, delete=paths) log.info("<== response=%s", response) return response + def set_request(self, prefix, delete=[], update=[]): + request = gnmi_pb2.SetRequest(prefix=prefix, delete=delete, update=update) + return logged_rpc_call("Set", request, + lambda: self.stub.Set(request, metadata=self.metadata)) + def parse_args(args): log.debug("==> args=%s", args) diff --git a/src/confd_gnmi_demo_adapter.py b/src/confd_gnmi_demo_adapter.py index 4dbe588..c4b22e6 100644 --- a/src/confd_gnmi_demo_adapter.py +++ b/src/confd_gnmi_demo_adapter.py @@ -1,3 +1,4 @@ +from contextlib import contextmanager import json import logging import random @@ -9,7 +10,7 @@ from random import randint import gnmi_pb2 -from confd_gnmi_adapter import GnmiServerAdapter +from confd_gnmi_adapter import GnmiServerAdapter, UpdateTransaction from confd_gnmi_common import make_xpath_path, make_gnmi_path, get_timestamp_ns log = logging.getLogger('confd_gnmi_demo_adapter') @@ -445,11 +446,20 @@ def get(self, prefix, paths, data_type, use_models): log.debug("<== notifications=%s", notifications) return notifications - def set_update(self, prefix, path, val): - log.info("==> prefix=%s, path=%s, val=%s", prefix, path, val) - path_str = make_xpath_path(path, prefix) + @contextmanager + def update_transaction(self, prefix): + yield DemoTransaction(prefix) + + +class DemoTransaction(UpdateTransaction): + def __init__(self, prefix): + self.prefix = prefix + + def set_update(self, path, val): + log.info("==> path=%s, val=%s", path, val) + path_str = make_xpath_path(path, self.prefix) op = gnmi_pb2.UpdateResult.INVALID - if self._nsless_xpath(path_str) in self.demo_db: + if GnmiDemoServerAdapter._nsless_xpath(path_str) in GnmiDemoServerAdapter.demo_db: if val.string_val: str_val = val.string_val elif val.json_ietf_val: @@ -459,24 +469,24 @@ def set_update(self, prefix, path, val): else: # TODO str_val = "{}".format(val) - str_val = str_val.replace(self.NS_IANA, "") - with self.db_lock: - self.demo_db[path_str] = str_val + str_val = str_val.replace(GnmiDemoServerAdapter.NS_IANA, "") + with GnmiDemoServerAdapter.db_lock: + GnmiDemoServerAdapter.demo_db[path_str] = str_val op = gnmi_pb2.UpdateResult.UPDATE log.info("==> op=%s", op) return op - def set(self, prefix, updates): - log.info("==> prefix=%s, updates=%s", prefix, updates) - ops = [(up.path, self.set_update(prefix, up.path, up.val)) + def update(self, updates): + log.info("==> updates=%s", updates) + ops = [(up.path, self.set_update(up.path, up.val)) for up in updates] log.info("==> ops=%s", ops) return ops - def delete(self, prefix, paths): - log.info("==> prefix=%s, paths=%s", prefix, paths) + def delete(self, paths): + log.info("==> paths=%s", paths) ops = [] # TODO log.info("==> ops=%s", ops) diff --git a/src/confd_gnmi_servicer.py b/src/confd_gnmi_servicer.py index 0c7ed74..cb5c347 100755 --- a/src/confd_gnmi_servicer.py +++ b/src/confd_gnmi_servicer.py @@ -166,9 +166,8 @@ def Set(self, request, context): adapter = self.get_connected_adapter(context) self.verify_updates_encoding_supported(request.update, adapter, context) # TODO for now we do not process replace list - # TODO: changes should be part of one transaction (gNMI spec. 3.4.3) - ops = adapter.set(request.prefix, request.update) - ops += adapter.delete(request.prefix, request.delete) + with adapter.update_transaction(request.prefix) as trans: + ops = trans.delete(request.delete) + trans.update(request.update) # Note: UpdateResult timestamp is deprecated, setting to -1 results = [gnmi_pb2.UpdateResult(timestamp=-1, path=path, op=op) diff --git a/tests/test_client_server_confd.py b/tests/test_client_server_confd.py index d110695..9ddbe67 100644 --- a/tests/test_client_server_confd.py +++ b/tests/test_client_server_confd.py @@ -214,6 +214,27 @@ def test_authentication(self, request): self._assert_auth("No such local user", username="bad", password="bad") self._assert_auth("No such local user", username="bad") + @pytest.mark.confd + def test_set_trans_order(self, request): + base = '/ietf-interfaces:interfaces/interface[name="if_8"]' + path = make_gnmi_path(base + '/enabled') + update = gnmi_pb2.Update(path=path, + val=gnmi_pb2.TypedValue(json_ietf_val=b'false')) + self.client.set_request(None, delete=[path], update=[update]) + datatype = gnmi_pb2.GetRequest.DataType.CONFIG + encoding = gnmi_pb2.Encoding.JSON_IETF + response = self.client.get(None, [path], datatype, encoding) + assert response.notification[0].update[0].val.json_ietf_val == b'false' + bad_update = gnmi_pb2.Update(path=make_gnmi_path(base + '/no-such-leaf'), + val=gnmi_pb2.TypedValue(json_ietf_val=b'42')) + with pytest.raises(grpc.RpcError): + self.client.set_request(None, delete=[path], update=[bad_update]) + response = self.client.get(None, [path], datatype, encoding) + assert response.notification[0].update[0].val.json_ietf_val == b'false' + update = gnmi_pb2.Update(path=path, + val=gnmi_pb2.TypedValue(json_ietf_val=b'true')) + self.client.set_request(None, update=[update]) + class TestGrpcConfDSet(GrpcBase): def set_adapter_type(self):