Skip to content

Commit

Permalink
confdgnmi - (review comments), refactoring of process_changes, other …
Browse files Browse the repository at this point in the history
…minor comments

Signed-off-by: Michal Novak <[email protected]>
  • Loading branch information
micnovak committed May 11, 2021
1 parent 5acad96 commit 4683069
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 77 deletions.
169 changes: 97 additions & 72 deletions confdgnmi/src/confd_gnmi_api_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,10 +140,14 @@ def kp_to_xpath(kp):
class ChangeOp(Enum):
MODIFIED = "mod"

def _append_changes(self, change_list):
log.debug("==> change_list=%s", change_list)
def _append_changes(self, change_tuple):
"""
:param change_tuple: 3 elem tuple (o, xpath, val)
:return:
"""
log.debug("==> change_tuple=%s", change_tuple)
with self.change_db_lock:
for c in change_list:
for c in change_tuple:
assert c[0] == self.ChangeOp.MODIFIED or \
c[0] == self.ChangeOp.MODIFIED.value
self.change_db.append((c[1], c[2]))
Expand All @@ -160,9 +164,9 @@ def cdb_iter(kp, op, oldv, newv, state):
# TODO CREATE not handled for now
if op == _confd.MOP_VALUE_SET:
log.debug("_confd.MOP_VALUE_SET")
self._append_changes([[self.ChangeOp.MODIFIED,
self._append_changes([(self.ChangeOp.MODIFIED,
self.kp_to_xpath(kp),
str(newv)]])
str(newv))])
# TODO MOP_VALUE_SET implement
elif op == _confd.MOP_DELETED:
log.debug("_confd.MOP_DELETED")
Expand All @@ -185,7 +189,6 @@ def process_external_change(self, ext_sock):
log.info("==>")
connection, client_address = ext_sock.accept()
with connection:
connection.setblocking(True)
# TODO make const
msg = connection.recv(1024)
log.debug("msg=%s", msg)
Expand All @@ -198,87 +201,109 @@ def process_external_change(self, ext_sock):
# the size must be smaller then size in recv
data = msg.decode().split('\n')
assert len(data) % 3 == 0
chunks = [data[x:x + 3] for x in range(0, len(data), 3)]
chunks = (data[x:x + 3] for x in range(0, len(data), 3))
self._append_changes(chunks)
self.put_event(self.SubscriptionEvent.SEND_CHANGES)
log.debug("data=%s", data)
log.info("<==")

def process_changes(self, external_changes=False):
def subscribe_monitored_paths_cdb(self, sub_sock):
"""
Subscribe to monitored paths
:param sub_sock:
:return: True, if some path is not in CDB
"""
log.debug("==> sub_sock=%s", sub_sock)
prio = 10
subscribed = has_non_cdb = False
# make subscription for all self.monitored_paths in CDB
for p in self.monitored_paths:
log.debug("subscribing config p=%s", p)
# TODO hash - breaks generic usage
# TODO for now we subscribe path for both, config and oper,
# TODO subscribe only for paths that exist
# it may be more efficient to find out type of path and subscribe
# only for one type
cs_node = _confd.cs_node_cd(None, p)
is_cdb = cs_node.info().flags() & _confd.CS_NODE_IS_CDB
if is_cdb:
subscribed = True
cdb.subscribe2(sub_sock, cdb.SUB_RUNNING, 0, prio, 0, p)
log.debug("subscribing operational p=%s", p)
cdb.subscribe2(sub_sock, cdb.SUB_OPERATIONAL, 0, prio, 0, p)
else:
has_non_cdb = True
if subscribed:
cdb.subscribe_done(sub_sock)
log.debug("<== has_non_cdb=%s", has_non_cdb)
return has_non_cdb

def start_external_change_server(self):
"""
Start external change server
:return: socket to listen for changes
"""
log.debug("==>")
# make subscription for all self.monitored_paths
log.info("Starting external change server!")
ext_server_sock = socket()
# TODO port (host) as const or command line option
ext_server_sock.bind(("localhost",
GnmiConfDApiServerAdapter.external_port))
ext_server_sock.listen(5)
log.debug("<== ext_server_sock=%s", ext_server_sock)
return ext_server_sock

def socket_loop(self, sub_sock, ext_server_sock=None):
log.debug("==> sub_sock=%s ext_server_sock=%s", sub_sock,
ext_server_sock)
rlist = [sub_sock, self.stop_pipe[0]]
if ext_server_sock is not None:
rlist.append(ext_server_sock)
wlist = elist = []
while True:
log.debug("rlist=%s", rlist)
r, w, e = select.select(rlist, wlist, elist)
log.debug("r=%s", r)
if ext_server_sock is not None and ext_server_sock in r:
self.process_external_change(ext_server_sock)
if self.stop_pipe[0] in r:
v = os.read(self.stop_pipe[0], 1)
assert v == b'x'
log.debug("Stopping ConfD loop")
break
if sub_sock in r:
try:
sub_info = cdb.read_subscription_socket2(
sub_sock)
for s in sub_info[2]:
self.process_subscription(sub_sock, s)
cdb.sync_subscription_socket(sub_sock,
cdb.DONE_PRIORITY)
except _confd.error.Error as e:
# Callback error
if e.confd_errno is _confd.ERR_EXTERNAL:
log.exception(e)
else:
raise e
log.debug("<==")

def process_changes(self, external_changes=False):
log.debug("==> external_changes=%s", external_changes)
with socket() as sub_sock:
prio = 10
cdb.connect(sub_sock, cdb.SUBSCRIPTION_SOCKET, '127.0.0.1',
_confd.CONFD_PORT)
found_in_cdb = has_non_cdb = False
for p in self.monitored_paths:
log.debug("subscribing config p=%s", p)
# TODO hash - breaks generic usage
# TODO for now we subscribe path for both, config and oper,
# TODO subscribe only for paths that exist
# it may be more efficient to find out type of path and subscribe
# only for one type
cs_node = _confd.cs_node_cd(None, p)
is_cdb = cs_node.info().flags() & _confd.CS_NODE_IS_CDB
if is_cdb:
found_in_cdb = True
cdb.subscribe2(sub_sock, cdb.SUB_RUNNING, 0, prio,
0, p)
log.debug("subscribing operational p=%s", p)
cdb.subscribe2(sub_sock, cdb.SUB_OPERATIONAL, 0, prio,
0, p)
else:
has_non_cdb = True
if found_in_cdb:
cdb.subscribe_done(sub_sock)
has_non_cdb = self.subscribe_monitored_paths_cdb(sub_sock)
log.debug("subscribe_done")
assert self.stop_pipe is not None
rlist = [sub_sock, self.stop_pipe[0]]
wlist = elist = []
try:
ext_server_sock = None
if external_changes and has_non_cdb:
log.info("Starting external change server!")
ext_server_sock = socket()
ext_server_sock.setblocking(False)
# TODO port (host) as const or command line option
ext_server_sock.bind(("localhost",
GnmiConfDApiServerAdapter.external_port))
ext_server_sock.listen(5)
rlist.append(ext_server_sock)
while True:
log.debug("rlist=%s", rlist)
r, w, e = select.select(rlist, wlist, elist)
log.debug("r=%s", r)
if ext_server_sock is not None and ext_server_sock in r:
self.process_external_change(ext_server_sock)
if self.stop_pipe[0] in r:
v = os.read(self.stop_pipe[0], 1)
assert v == b'x'
log.debug("Stopping ConfD loop")
break
if sub_sock in r:
try:
sub_info = cdb.read_subscription_socket2(
sub_sock)
for s in sub_info[2]:
self.process_subscription(sub_sock, s)
cdb.sync_subscription_socket(sub_sock,
cdb.DONE_PRIORITY)
except _confd.error.Error as e:
# Callback error
if e.confd_errno is _confd.ERR_EXTERNAL:
log.exception(e)
else:
raise e

ext_server_sock = self.start_external_change_server()
with ext_server_sock:
self.socket_loop(sub_sock, ext_server_sock)
else:
self.socket_loop(sub_sock)
except Exception as e:
log.exception(e)
finally:
if ext_server_sock is not None:
ext_server_sock.close()

log.debug("<==")

def start_monitoring(self):
Expand Down
11 changes: 6 additions & 5 deletions confdgnmi/src/confd_gnmi_demo_adapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,15 +222,16 @@ def process_changes(self):
else:
log.info("c=%s self.monitored_paths=%s",
c, self.monitored_paths)
if any(c[0].startswith(elem) for elem in
(path, val) = c
if any(path.startswith(elem) for elem in
self.monitored_paths):
log.info("appending c=%s", c)
self.change_db.append(c)
if c[0] in self.adapter.demo_db:
self.adapter.demo_db[c[0]] = c[1]
elif c[0] in self.adapter.demo_state_db:
if path in self.adapter.demo_db:
self.adapter.demo_db[path] = val
elif path in self.adapter.demo_state_db:
self.adapter.demo_state_db[
c[0]] = c[1]
path] = val
else:
assert False
if send:
Expand Down

0 comments on commit 4683069

Please sign in to comment.