From 28b2631a17321e73ed4740cdff57fc83422fdbcb Mon Sep 17 00:00:00 2001 From: Philip Feairheller Date: Fri, 21 Jun 2024 07:48:45 -0700 Subject: [PATCH] New and updated kli commands for witnesss, watchers and mailboxes. Delegation fixes (#805) * New kli commands for listing witnesss, watchers and mailboxes. Update to kli command `kli watcher add` to use new BADA-RUN protected data structures for tracking AIDs being observed by a watcher on behalf of a controller New kli command to perform key event adjudication across the set of watchers watching an AID. This command will retrieve updated key state if a threshold satisfying number watchers response with non-duplicitous key state change. Addition to rotate and delegate confirm to all the specification of witness auth code time stamps. Update to delegation processing to propagate the delegator anchor event to witnesses after delegation approval New databases to track observed AIDs (observed by watchers) and delegation propogation escrow. Bug fix in `kli mailbox debug` to account for empty local state. Signed-off-by: pfeairheller * Factoring several classes and methods into a watching package including a new Adjudicator class that can be used outside of the KERIpy command line. Signed-off-by: pfeairheller * Added tests for Adjudicator and diffState Signed-off-by: pfeairheller --------- Signed-off-by: pfeairheller --- src/keri/app/agenting.py | 6 + src/keri/app/cli/commands/delegate/confirm.py | 12 +- src/keri/app/cli/commands/local/watch.py | 44 +--- src/keri/app/cli/commands/mailbox/debug.py | 11 +- src/keri/app/cli/commands/mailbox/list.py | 68 ++++++ src/keri/app/cli/commands/watcher/add.py | 20 +- .../app/cli/commands/watcher/adjudicate.py | 159 +++++++++++++ src/keri/app/cli/commands/watcher/list.py | 68 ++++++ src/keri/app/cli/commands/witness/list.py | 56 +++++ src/keri/app/delegating.py | 37 ++- src/keri/app/querying.py | 4 +- src/keri/app/watching.py | 217 ++++++++++++++++++ src/keri/core/eventing.py | 137 +++++++++-- src/keri/core/routing.py | 3 - src/keri/db/basing.py | 66 +++++- tests/app/test_storing.py | 2 +- tests/app/test_watching.py | 159 +++++++++++++ tests/core/test_reply.py | 163 ++++++++++++- 18 files changed, 1144 insertions(+), 88 deletions(-) create mode 100644 src/keri/app/cli/commands/mailbox/list.py create mode 100644 src/keri/app/cli/commands/watcher/adjudicate.py create mode 100644 src/keri/app/cli/commands/watcher/list.py create mode 100644 src/keri/app/cli/commands/witness/list.py create mode 100644 src/keri/app/watching.py create mode 100644 tests/app/test_watching.py diff --git a/src/keri/app/agenting.py b/src/keri/app/agenting.py index ad164a9b3..456e3f4bc 100644 --- a/src/keri/app/agenting.py +++ b/src/keri/app/agenting.py @@ -581,6 +581,7 @@ def __init__(self, hby, msgs=None, cues=None, **kwa): """ self.hby = hby + self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() super(WitnessPublisher, self).__init__(doers=[doing.doify(self.sendDo)], **kwa) @@ -599,6 +600,7 @@ def sendDo(self, tymth=None, tock=0.0, **opts): while True: while self.msgs: evt = self.msgs.popleft() + self.posted += 1 pre = evt["pre"] msg = evt["msg"] @@ -642,6 +644,10 @@ def sent(self, said): return False + @property + def idle(self): + return len(self.msgs) == 0 and self.posted == len(self.cues) + class TCPMessenger(doing.DoDoer): """ Send events to witnesses for receipting using TCP direct connection diff --git a/src/keri/app/cli/commands/delegate/confirm.py b/src/keri/app/cli/commands/delegate/confirm.py index a0ddb7525..25eb27764 100644 --- a/src/keri/app/cli/commands/delegate/confirm.py +++ b/src/keri/app/cli/commands/delegate/confirm.py @@ -37,6 +37,7 @@ action='store_true') parser.add_argument('--code', help=': formatted witness auth codes. Can appear multiple times', default=[], action="append", required=False) +parser.add_argument('--code-time', help='Time the witness codes were captured.', default=None, required=False) def confirm(args): @@ -54,16 +55,18 @@ def confirm(args): auto = args.auto authenticate = args.authenticate codes = args.code + codeTime = args.code_time confirmDoer = ConfirmDoer(name=name, base=base, alias=alias, bran=bran, interact=interact, auto=auto, - authenticate=authenticate, codes=codes) + authenticate=authenticate, codes=codes, codeTime=codeTime) doers = [confirmDoer] return doers class ConfirmDoer(doing.DoDoer): - def __init__(self, name, base, alias, bran, interact=False, auto=False, authenticate=False, codes=None): + def __init__(self, name, base, alias, bran, interact=False, auto=False, authenticate=False, codes=None, + codeTime=None): hby = existing.setupHby(name=name, base=base, bran=bran) self.hbyDoer = habbing.HaberyDoer(habery=hby) # setup doer self.witq = agenting.WitnessInquisitor(hby=hby) @@ -73,6 +76,7 @@ def __init__(self, name, base, alias, bran, interact=False, auto=False, authenti self.mux = grouping.Multiplexor(hby=hby, notifier=self.notifier) self.authenticate = authenticate self.codes = codes if codes is not None else [] + self.codeTime = codeTime exc = exchanging.Exchanger(hby=hby, handlers=[]) delegating.loadHandlers(hby=hby, exc=exc, notifier=self.notifier) @@ -185,9 +189,11 @@ def confirmDo(self, tymth, tock=0.0): auths = {} if self.authenticate: + codeTime = helping.fromIso8601( + self.codeTime) if self.codeTime is not None else helping.nowIso8601() for arg in self.codes: (wit, code) = arg.split(":") - auths[wit] = f"{code}#{helping.nowIso8601()}" + auths[wit] = f"{code}#{codeTime}" for wit in hab.kever.wits: if wit in auths: diff --git a/src/keri/app/cli/commands/local/watch.py b/src/keri/app/cli/commands/local/watch.py index 9e17841a7..b93aeba3c 100644 --- a/src/keri/app/cli/commands/local/watch.py +++ b/src/keri/app/cli/commands/local/watch.py @@ -7,14 +7,13 @@ import random import sys import time -from collections import namedtuple from hio import help from hio.base import doing from keri.app import agenting, indirecting, habbing, forwarding from keri.app.cli.common import existing, terming from keri.app.habbing import GroupHab -from keri.core import coring +from keri.app.watching import States, diffState logger = help.ogler.getLogger() @@ -31,17 +30,6 @@ parser.add_argument('--aeid', help='qualified base64 of non-transferable identifier prefix for authentication ' 'and encryption of secrets in keystore', default=None) -Stateage = namedtuple("Stateage", 'even ahead behind duplicitous') - -States = Stateage(even="even", ahead="ahead", behind="behind", duplicitous="duplicitous") - - -class WitnessState: - wit: str - state: Stateage - sn: int - dig: str - def watch(args): name = args.name @@ -135,7 +123,7 @@ def watchDo(self, tymth, tock=0.0, **opts): mystate = hab.kever.state() witstate = hab.db.ksns.get((saider.qb64,)) - states.append(self.diffState(wit, mystate, witstate)) + states.append(diffState(wit, mystate, witstate)) # First check for any duplicity, if so get out of here dups = [state for state in states if state.state == States.duplicitous] @@ -213,31 +201,3 @@ def cueDo(self, tymth, tock=0.0, **opts): yield self.tock yield self.tock - - @staticmethod - def diffState(wit, preksn, witksn): - - witstate = WitnessState() - witstate.wit = wit - mysn = int(preksn.s, 16) - mydig = preksn.d - witstate.sn = int(witksn.f, 16) - witstate.dig = witksn.d - - # At the same sequence number, check the DIGs - if mysn == witstate.sn: - if mydig == witstate.dig: - witstate.state = States.even - else: - witstate.state = States.duplicitous - - # This witness is behind and will need to be caught up. - elif mysn > witstate.sn: - witstate.state = States.behind - - # mysn < witstate.sn - We are behind this witness (multisig or restore situation). - # Must ensure that controller approves this event or a recovery rotation is needed - else: - witstate.state = States.ahead - - return witstate diff --git a/src/keri/app/cli/commands/mailbox/debug.py b/src/keri/app/cli/commands/mailbox/debug.py index d305a1ad1..873c9ad05 100644 --- a/src/keri/app/cli/commands/mailbox/debug.py +++ b/src/keri/app/cli/commands/mailbox/debug.py @@ -85,7 +85,7 @@ def readDo(self, tymth, tock=0.0): hab = self.hby.habByName(name=self.alias) topics = {"/receipt": 0, "/replay": 0, "/multisig": 0, "/credential": 0, "/delegate": 0, "/challenge": 0, - "/oobi": 0} + "/oobi": 0, "/reply": 0} try: client, clientDoer = agenting.httpClient(hab, self.witness) except kering.MissingEntryError as e: @@ -95,8 +95,11 @@ def readDo(self, tymth, tock=0.0): print("Local Index per Topic") witrec = hab.db.tops.get((hab.pre, self.witness)) - for topic in witrec.topics: - print(f" Topic {topic}: {witrec.topics[topic]}") + if witrec: + for topic in witrec.topics: + print(f" Topic {topic}: {witrec.topics[topic]}") + else: + print("\tNo local index") print() q = dict(pre=hab.pre, topics=topics) @@ -107,7 +110,7 @@ def readDo(self, tymth, tock=0.0): httping.createCESRRequest(msg, client, dest=self.witness) - while client.requests: + while client.requests or (not client.events and not client.requests): yield self.tock yield 1.0 diff --git a/src/keri/app/cli/commands/mailbox/list.py b/src/keri/app/cli/commands/mailbox/list.py new file mode 100644 index 000000000..369e749df --- /dev/null +++ b/src/keri/app/cli/commands/mailbox/list.py @@ -0,0 +1,68 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse + +from hio import help +from hio.base import doing + +from keri.app import connecting +from keri.app.cli.common import existing +from keri.kering import ConfigurationError, Roles + +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='List current mailboxes') +parser.set_defaults(handler=lambda args: handle(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--alias', '-a', help='human readable alias for the identifier to whom the credential was issued', + required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--passcode', '-p', help='22 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran + + +def handle(args): + """ Command line handler for adding an aid to a watcher's list of AIds to watch + + Parameters: + args(Namespace): parsed command line arguments + + """ + + kwa = dict(args=args) + return [doing.doify(listMailboxes, **kwa)] + + +def listMailboxes(tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + args = opts["args"] + name = args.name + alias = args.alias + base = args.base + bran = args.bran + + try: + with existing.existingHby(name=name, base=base, bran=bran) as hby: + org = connecting.Organizer(hby=hby) + if alias is None: + alias = existing.aliasInput(hby) + + hab = hby.habByName(alias) + + for (aid, role, eid), ender in hab.db.ends.getItemIter(keys=(hab.pre, Roles.mailbox)): + if ender.allowed: + contact = org.get(eid) + print(f"{contact['alias']}: {eid}") + + except ConfigurationError as e: + print(f"identifier prefix for {name} does not exist, incept must be run first", ) + return -1 diff --git a/src/keri/app/cli/commands/watcher/add.py b/src/keri/app/cli/commands/watcher/add.py index 7f3c8410f..c993f8e90 100644 --- a/src/keri/app/cli/commands/watcher/add.py +++ b/src/keri/app/cli/commands/watcher/add.py @@ -11,7 +11,8 @@ from keri.app import connecting, habbing, forwarding from keri.app.cli.common import existing -from keri.core import eventing, serdering +from keri.core import serdering +from keri.kering import Roles logger = help.ogler.getLogger() @@ -91,7 +92,7 @@ def __init__(self, name, alias, base, bran, watcher, watched): super(AddDoer, self).__init__(doers=doers) def addDo(self, tymth, tock=0.0): - """ Grant credential by creating /ipex/grant exn message + """ Add an AID to a watcher's list of AIDs to watch Parameters: tymth (function): injected function wrapper closure returned by .tymen() of @@ -109,17 +110,28 @@ def addDo(self, tymth, tock=0.0): if isinstance(self.hab, habbing.GroupHab): raise ValueError("watchers for multisig AIDs not currently supported") + ender = self.hab.db.ends.get(keys=(self.hab.pre, Roles.watcher, self.watcher)) + if not ender or not ender.allowed: + msg = self.hab.reply(route="/end/role/add", + data=dict(cid=self.hab.pre, role=Roles.watcher, eid=self.watcher)) + self.hab.psr.parseOne(ims=msg) + postman = forwarding.StreamPoster(hby=self.hby, hab=self.hab, recp=self.watcher, topic="reply") + for msg in self.hab.db.cloneDelegation(self.hab.kever): + serder = serdering.SerderKERI(raw=msg) + postman.send(serder=serder, attachment=msg[serder.size:]) + for msg in self.hab.db.clonePreIter(pre=self.hab.pre): serder = serdering.SerderKERI(raw=msg) postman.send(serder=serder, attachment=msg[serder.size:]) data = dict(cid=self.hab.pre, - wid=self.watched, + oid=self.watched, oobi=self.oobi) - route = "/watcher/aid/add" + route = f"/watcher/{self.watcher}/add" msg = self.hab.reply(route=route, data=data) + self.hab.psr.parseOne(ims=bytes(msg)) rpy = serdering.SerderKERI(raw=msg) postman.send(serder=rpy, attachment=msg[rpy.size:]) diff --git a/src/keri/app/cli/commands/watcher/adjudicate.py b/src/keri/app/cli/commands/watcher/adjudicate.py new file mode 100644 index 000000000..82e368e83 --- /dev/null +++ b/src/keri/app/cli/commands/watcher/adjudicate.py @@ -0,0 +1,159 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse +import datetime +import random +import sys + +from hio import help +from hio.base import doing + +from keri.app import connecting, indirecting, querying, watching +from keri.app.cli.common import existing +from keri.app.watching import diffState, States +from keri.help import helping +from keri.kering import ConfigurationError +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='Perform key event adjudication on any new key state from watchers.') +parser.set_defaults(handler=lambda args: handle(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--alias', '-a', help='human readable alias for the identifier to whom the credential was issued', + required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--passcode', '-p', help='22 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran +parser.add_argument('--toad', '-t', default=None, required=False, type=int, + help='int of watcher threshold (threshold of acceptable duplicity)', ) +parser.add_argument("--watched", '-W', help="the watched AID or alias to add", required=True) +parser.add_argument("--poll", "-P", help="Poll mailboxes for any issued credentials", action="store_true") + + +def handle(args): + """ Command line handler for adding an aid to a watcher's list of AIds to watch + + Parameters: + args(Namespace): parsed command line arguments + + """ + + kwa = dict(args=args) + adjudicator = AdjudicationDoer(**kwa) + + return [adjudicator] + + +class AdjudicationDoer(doing.DoDoer): + + def __init__(self, **kwa): + args = kwa["args"] + base = args.base + bran = args.bran + self.name = args.name + self.alias = args.alias + self.watched = args.watched + self.poll = args.poll + self.toad = args.toad + + self.hby = existing.setupHby(name=self.name, base=base, bran=bran) + self.mbx = indirecting.MailboxDirector(hby=self.hby, topics=['/reply', '/replay']) + doers = [doing.doify(self.adjudicate, **kwa), self.mbx] + + super(AdjudicationDoer, self).__init__(**kwa, doers=doers) + + def adjudicate(self, tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + + try: + org = connecting.Organizer(hby=self.hby) + + if self.poll: + end = helping.nowUTC() + datetime.timedelta(seconds=5) + sys.stdout.write(f"Polling mailboxes") + sys.stdout.flush() + while helping.nowUTC() < end: + sys.stdout.write(".") + sys.stdout.flush() + yield 1.0 + print("\n") + + if self.watched in self.hby.kevers: + watd = self.watched + else: + watd = org.find("alias", self.watched) + if len(watd) != 1: + raise ValueError(f"invalid recipient {self.watched}") + watd = watd[0]['id'] + + if not watd: + raise ValueError(f"unknown watched {self.watched}") + + if self.alias is None: + self.alias = existing.aliasInput(self.hby) + + hab = self.hby.habByName(self.alias) + if hab is None: + raise ValueError(f"unknown alias {self.alias}") + + adj = watching.Adjudicator(hby=self.hby, hab=hab) + adjDoer = watching.AdjudicationDoer(adj) + self.extend([adjDoer]) + + adj.msgs.append(dict(oid=self.watched, toad=self.toad)) + + while not adj.cues: + yield self.tock + + cue = adj.cues.pull() + kin = cue['kin'] + + match kin: + case "keyStateConsistent": + states = cue['states'] + wids = cue["wids"] + print(f"Local key state is consistent with the {len(states)} (out of " + f"{len(wids)} total) watchers that responded") + + case "keyStateLagging": + bhds = cue["behinds"] + print("The following watchers are behind the local KEL:") + for state in bhds: + print(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + print(f"Recommend the checking those watchers for access to {self.watched} witnesses") + + case "keyStateUpdate": + ahds = cue["aheads"] + logger.info(f"Threshold ({self.toad}) satisfying number of watchers ({len(ahds)}) are ahead") + for state in ahds: + logger.info(f"\tWatcher {state.wit} at Seq No. {state.sn} with digest: {state.dig}") + + state = random.choice(ahds) + querier = querying.SeqNoQuerier(hby=self.hby, hab=hab, pre=self.watched, sn=state.sn, + wits=[state.wit]) + self.extend([querier]) + + while not querier.done: + yield self.tock + + case "keyStateDuplicitous": + dups = cue["dups"] + print(f"Duplicity detected for AID {self.watched}, local key state remains intact.") + for state in dups: + print(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + self.remove([self.mbx, adjDoer]) + + except ConfigurationError as e: + print(f"identifier prefix for {self.name} does not exist, incept must be run first", ) + return -1 + diff --git a/src/keri/app/cli/commands/watcher/list.py b/src/keri/app/cli/commands/watcher/list.py new file mode 100644 index 000000000..ff54dc0ff --- /dev/null +++ b/src/keri/app/cli/commands/watcher/list.py @@ -0,0 +1,68 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse + +from hio import help +from hio.base import doing + +from keri.app import connecting +from keri.app.cli.common import existing +from keri.kering import ConfigurationError, Roles + +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='List current watchers') +parser.set_defaults(handler=lambda args: handle(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--alias', '-a', help='human readable alias for the identifier to whom the credential was issued', + required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--passcode', '-p', help='22 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran + + +def handle(args): + """ Command line handler for adding an aid to a watcher's list of AIds to watch + + Parameters: + args(Namespace): parsed command line arguments + + """ + + kwa = dict(args=args) + return [doing.doify(listWatchers, **kwa)] + + +def listWatchers(tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + args = opts["args"] + name = args.name + alias = args.alias + base = args.base + bran = args.bran + + try: + with existing.existingHby(name=name, base=base, bran=bran) as hby: + org = connecting.Organizer(hby=hby) + if alias is None: + alias = existing.aliasInput(hby) + + hab = hby.habByName(alias) + + for (aid, role, eid), ender in hab.db.ends.getItemIter(keys=(hab.pre, Roles.watcher, )): + if ender.allowed: + contact = org.get(eid) + print(f"{contact['alias']}: {eid}") + + except ConfigurationError as e: + print(f"identifier prefix for {name} does not exist, incept must be run first", ) + return -1 diff --git a/src/keri/app/cli/commands/witness/list.py b/src/keri/app/cli/commands/witness/list.py new file mode 100644 index 000000000..9ab11e6e5 --- /dev/null +++ b/src/keri/app/cli/commands/witness/list.py @@ -0,0 +1,56 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse + +from hio import help +from hio.base import doing + +from keri.app.cli.common import displaying, existing +from keri.core import serdering +from keri.kering import ConfigurationError + +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='List AIDs of witness for the provided AID') +parser.set_defaults(handler=lambda args: handler(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--alias', '-a', help='human readable alias for the new identifier prefix', default=None) +parser.add_argument('--passcode', '-p', help='21 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran + + +def handler(args): + kwa = dict(args=args) + return [doing.doify(listWitnesses, **kwa)] + + +def listWitnesses(tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + args = opts["args"] + name = args.name + alias = args.alias + base = args.base + bran = args.bran + + try: + with existing.existingHby(name=name, base=base, bran=bran) as hby: + if alias is None: + alias = existing.aliasInput(hby) + + hab = hby.habByName(alias) + for idx, wit in enumerate(hab.kever.wits): + print(f'{wit}') + + except ConfigurationError as e: + print(f"identifier prefix for {name} does not exist, incept must be run first", ) + return -1 diff --git a/src/keri/app/delegating.py b/src/keri/app/delegating.py index 8d9e4c194..6b90c53ef 100644 --- a/src/keri/app/delegating.py +++ b/src/keri/app/delegating.py @@ -44,11 +44,11 @@ def __init__(self, hby, proxy=None, auths=None, **kwa): self.postman = forwarding.Poster(hby=hby) self.witq = agenting.WitnessInquisitor(hby=hby) self.witDoer = agenting.Receiptor(hby=self.hby) + self.publishers = dict() self.proxy = proxy self.auths = auths - super(Anchorer, self).__init__(doers=[self.witq, self.witDoer, self.postman, doing.doify(self.escrowDo)], - **kwa) + super(Anchorer, self).__init__(doers=[self.witq, self.witDoer, self.postman, doing.doify(self.escrowDo)], **kwa) def delegation(self, pre, sn=None, proxy=None, auths=None): if pre not in self.hby.habs: @@ -57,6 +57,7 @@ def delegation(self, pre, sn=None, proxy=None, auths=None): if proxy is not None: self.proxy = proxy + self.publishers[pre] = agenting.WitnessPublisher(hby=self.hby) # load the hab of the delegated identifier to anchor hab = self.hby.habs[pre] delpre = hab.kever.delpre # get the delegator identifier @@ -123,6 +124,7 @@ def escrowDo(self, tymth, tock=1.0): def processEscrows(self): self.processPartialWitnessEscrow() self.processUnanchoredEscrow() + self.processWitnessPublication() def processUnanchoredEscrow(self): """ @@ -143,8 +145,9 @@ def processUnanchoredEscrow(self): self.hby.db.setAes(dgkey, couple) # authorizer event seal (delegator/issuer) # Move to escrow waiting for witness receipts - logger.info(f"Delegation approval received, {serder.pre} confirmed") - self.hby.db.cdel.put(keys=(pre, coring.Seqner(sn=serder.sn).qb64), val=coring.Saider(qb64=serder.said)) + logger.info(f"Delegation approval received, {serder.pre} confirmed, publishing to my witnesses") + self.publishDelegator(pre) + self.hby.db.dpub.put(keys=(pre, said), val=serder) self.hby.db.dune.rem(keys=(pre, said)) def processPartialWitnessEscrow(self): @@ -197,6 +200,32 @@ def processPartialWitnessEscrow(self): self.hby.db.dpwe.rem(keys=(pre, said)) self.hby.db.dune.pin(keys=(srdr.pre, srdr.said), val=srdr) + def processWitnessPublication(self): + """ + Process escrow of partially signed multisig group KEL events. Message + processing will send this local controllers signature to all other participants + then this escrow waits for signatures from all other participants + + """ + for (pre, said), serder in self.hby.db.dpub.getItemIter(): # group partial witness escrow + publisher = self.publishers[pre] + + if not publisher.idle: + continue + + self.remove([publisher]) + del self.publishers[pre] + + self.hby.db.dpub.rem(keys=(pre, said)) + self.hby.db.cdel.put(keys=(pre, coring.Seqner(sn=serder.sn).qb64), val=coring.Saider(qb64=serder.said)) + + def publishDelegator(self, pre): + hab = self.hby.habs[pre] + publisher = self.publishers[pre] + self.extend([publisher]) + for msg in hab.db.cloneDelegation(hab.kever): + publisher.msgs.append(dict(pre=hab.pre, msg=bytes(msg))) + def loadHandlers(hby, exc, notifier): """ Load handlers for the peer-to-peer delegation protocols diff --git a/src/keri/app/querying.py b/src/keri/app/querying.py index 2dc4ce17d..11541640f 100644 --- a/src/keri/app/querying.py +++ b/src/keri/app/querying.py @@ -90,13 +90,13 @@ def recur(self, tyme, deeds=None): class SeqNoQuerier(doing.DoDoer): - def __init__(self, hby, hab, pre, sn, **opts): + def __init__(self, hby, hab, pre, sn, wits=None, **opts): self.hby = hby self.hab = hab self.pre = pre self.sn = sn self.witq = agenting.WitnessInquisitor(hby=self.hby) - self.witq.query(src=self.hab.pre, pre=self.pre, sn="{:x}".format(self.sn)) + self.witq.query(src=self.hab.pre, pre=self.pre, sn="{:x}".format(self.sn), wits=wits) super(SeqNoQuerier, self).__init__(doers=[self.witq], **opts) def recur(self, tyme, deeds=None): diff --git a/src/keri/app/watching.py b/src/keri/app/watching.py new file mode 100644 index 000000000..3a720d594 --- /dev/null +++ b/src/keri/app/watching.py @@ -0,0 +1,217 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.app.watching module + +""" +import random +from collections import namedtuple +from dataclasses import dataclass + +from hio.base import doing +from hio.help import decking + +from keri import help + +logger = help.ogler.getLogger() + +Stateage = namedtuple("Stateage", 'even ahead behind duplicitous') + +States = Stateage(even="even", ahead="ahead", behind="behind", duplicitous="duplicitous") + + +@dataclass +class DiffState: + """ Difference between a remote KeyStateRecord and local for the same AID. + + Uses Stateage to represent whether the remote KSR is even, ahead, behind or duplicitous + + """ + wit: str # The entity reporting the KSR (non-local) + state: Stateage # The state of the remote KSR relative to local + sn: int # The sequence number of the remote KSR + dig: str # The digest of the latest event of the remote KSR + + +class Adjudicator: + """ The Adjudicator of Key State + + This class performs key state adjudication by checking any key state reported by the watcher set for a given + watched AID and compares the reported values against the local key state for the watched AID and the key state + of all other responding watchers. It uses a per-adjudication threshold to determine what is acceptable duplicity + for each adjudication. + + Cues are sent out for each round of adjudication with the following kins: + + keyStateConsistent - Key state of all queries watchers is consistent with local key state + keyStateLagging - Key state from some watchers is behind local key state and other watchers + keyStateUpdate - A threshold satisfying number of watchers report new key state for watched AID + keyStateDuplicitous - Duplicity has been detected on some set of watchers (provided in the cue) + + Consumers of the Adjudicator's cues are safe to retrieve new key state from one of the Watchers listed in the + cue of `keyStateUpdated` is received. All other kins require controller intervention and should be bubbled up. + + """ + + def __init__(self, hby, hab, msgs=None, cues=None): + """ Create instance of Adjudicator for adjudicating key state + + Parameters: + hby (Habery): database and Habitat environment + hab (Hab): identifier database environment + msgs (Deck): incoming requests to adjudicate key state + cues (Deck): outgoing responses to adjudication of key state + + """ + self.hby = hby + self.hab = hab + self.msgs = msgs if msgs is not None else decking.Deck() + self.cues = cues if cues is not None else decking.Deck() + + def performAdjudications(self): + """ Process loop of existing messages requesting key state adjudication """ + while self.msgs: + msg = self.msgs.pull() + + watched = msg["oid"] + toad = msg["toad"] if "toad" in msg else None + + self.adjudicate(watched, toad) + + def adjudicate(self, watched, toad=None): + """ Perform key state adjudication against the `watched` AID and provided threshold + + If `toad` is not provided, the full set of watchers must come to consensus before `keyStateUpdate` + will be reported. + + Parameters: + watched (str): qb64 AID to adjudicate for key state duplicity + toad (int): threshold of acceptable duplicity amongst available watchers + + + """ + watchers = set() + for (cid, aid, oid), observed in self.hab.db.obvs.getItemIter(keys=(self.hab.pre,)): + if observed.enabled and oid == watched: + watchers.add(aid) + + toad = int(toad) if toad else len(watchers) + if toad > len(watchers): + raise ValueError(f"Threshold of {toad} is greater than number watchers {len(watchers)}") + + states = [] + mystate = self.hab.kever.state() + for watcher in watchers: + saider = self.hab.db.knas.get(keys=(watched, watcher)) + if saider is None: + print(f"No key state from watcher {watcher} for {watched}") + continue + + ksn = self.hab.db.ksns.get(keys=(saider.qb64,)) + states.append(diffState(watcher, mystate, ksn)) + + dups = [state for state in states if state.state == States.duplicitous] + ahds = [state for state in states if state.state == States.ahead] + bhds = [state for state in states if state.state == States.behind] + + if len(dups) > 0: + cue = dict(kin="keyStateDuplicitous", cid=self.hab.pre, oid=watched, wids=watchers, dups=dups) + self.cues.append(cue) + + logger.error(f"Duplicity detected for AID {watched}, local key state remains intact.") + for state in dups: + logger.error(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + elif len(ahds) > 0: + # Only group habs can be behind their watchers + # First check for duplicity among the watchers that are ahead (possible only if toad is below + # super majority) + digs = set([state.dig for state in ahds]) + if len(digs) > 1: # Duplicity across watcher sets + cue = dict(kin="keyStateDuplicitous", cid=self.hab.pre, oid=watched, wids=watchers, dups=ahds) + self.cues.append(cue) + + logger.error(f"There are multiple duplicitous events on watcher for {watched}") + for state in ahds: + logger.error(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + elif len(ahds) >= toad: # all witnesses that are ahead agree on the event + logger.info(f"Threshold ({toad}) satisfying number of watchers ({len(ahds)}) are ahead") + for state in ahds: + logger.info(f"\tWatcher {state.wit} at Seq No. {state.sn} with digest: {state.dig}") + + state = random.choice(ahds) + cue = dict(kin="keyStateUpdate", cid=self.hab.pre, oid=watched, wids=watchers, sn=state.sn, aheads=ahds) + self.cues.append(cue) + + elif len(bhds) > 0: + cue = dict(kin="keyStateLagging", cid=self.hab.pre, oid=watched, wids=watchers, behind=bhds) + self.cues.append(cue) + + logger.info("The following watchers are behind the local KEL:") + for state in bhds: + logger.info(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + logger.info(f"Recommend the checking those watchers for access to {watched} witnesses") + + else: + cue = dict(kin="keyStateConsistent", cid=self.hab.pre, oid=watched, wids=watchers, states=states) + self.cues.append(cue) + logger.info(f"Local key state is consistent with the {len(states)} (out of " + f"{len(watchers)} total) watchers that responded") + + +class AdjudicationDoer(doing.Doer): + """ Doer class responsible for process adjudication requests in an Adjudicator's msgs """ + + def __init__(self, adjudicator): + """ Create instance of Doer for performing key state adjudications """ + self.adjudicator = adjudicator + super(AdjudicationDoer, self).__init__() + + def recur(self, tyme): + """ Perform one pass over all adjudication requests + + Parameters: + tyme (float): relative cycle time + + Returns: + + """ + self.adjudicator.performAdjudications() + + +def diffState(wit, preksn, witksn): + """ Return a record of the differences between the states provided by `wit` and local state + + Parameters: + wit (str): qb64 AID of entity reporting key state + preksn (KeyStateRecord): Local key state of AID + witksn (KeyStateRecord): Key state of AID as provided by `wit` + + Returns: + state (WitnessState): record indicating the differenced between the two provided KSN records + + """ + mysn = int(preksn.s, 16) + mydig = preksn.d + sn = int(witksn.s, 16) + dig = witksn.d + + # At the same sequence number, check the DIGs + if mysn == sn: + if mydig == dig: + state = States.even + else: + state = States.duplicitous + + # This witness is behind and will need to be caught up. + elif mysn > sn: + state = States.behind + + # mysn < witstate.sn - We are behind this witness (multisig or restore situation). + # Must ensure that controller approves this event or a recovery rotation is needed + else: + state = States.ahead + + return DiffState(wit, state, sn, dig) diff --git a/src/keri/core/eventing.py b/src/keri/core/eventing.py index dd9cb2244..7b552b072 100644 --- a/src/keri/core/eventing.py +++ b/src/keri/core/eventing.py @@ -7,6 +7,7 @@ import json import logging from collections import namedtuple +from dataclasses import asdict from urllib.parse import urlsplit from math import ceil from ordered_set import OrderedSet as oset @@ -39,7 +40,7 @@ from . import serdering from ..db import basing, dbing -from ..db.basing import KeyStateRecord, StateEERecord +from ..db.basing import KeyStateRecord, StateEERecord, OobiRecord from ..db.dbing import dgKey, snKey, fnKey, splitKeySN, splitKey @@ -2098,8 +2099,6 @@ def rotate(self, serder): raise ValidationError(f"Invalid sith = {serder.tholder} for keys = " f"{keys} for evt = {ked}.") - - # compute wits from existing .wits with new cuts and adds from event # use ordered set math ops to verify and ensure strict ordering of wits # cuts and add to ensure that indexed signatures on indexed witness @@ -2118,7 +2117,6 @@ def rotate(self, serder): return tholder, toader, wits, cuts, adds - def deriveBacks(self, serder): """Derives and return tuple of (wits, cuts, adds) for backers given current set and any changes provided by serder. @@ -2243,10 +2241,10 @@ def valSigsWigsDel(self, serder, sigers, verfers, tholder, (self.locallyOwned() or self.locallyWitnessed(wits=wits))): self.escrowMFEvent(serder=serder, sigers=sigers, wigers=wigers, - seqner=delseqner, saider=delsaider, local=local) + seqner=delseqner, saider=delsaider, local=local) raise MisfitEventSourceError(f"Nonlocal source for locally owned" - f"or locally witnessed event" - f" = {serder.ked}.") + f" or locally witnessed event" + f" = {serder.ked}, {wits}, {self.prefixes}") werfers = [Verfer(qb64=wit) for wit in wits] # get witness public key verifiers # get unique verified wigers and windices lists from wigers list @@ -2627,19 +2625,15 @@ def validateDelegation(self, serder, sigers, wigers, wits, local=True, # misfit escrow first. Mistfit escrow must first # promote to local and reprocess event before we get to here self.escrowDelegableEvent(serder=serder, sigers=sigers, - wigers=wigers,local=local) + wigers=wigers, local=local) raise MissingDelegableApprovalError(f"Missing approval for " f" delegation by {delpre} of" - f"event = {serder.ked}.") - - #self.cues.push(dict(kin="approveDelegation", - #delegator=kever.delpre, - #serder=serder)) + f"event = {serder.ked}.") else: # not local delegator so escrow self.escrowPSEvent(serder=serder, sigers=sigers, wigers=wigers, local=local) raise MissingDelegationError(f"No delegation seal for delegator " - "{delpre} of evt = {serder.ked}.") + f"{delpre} of evt = {serder.ked}.") ssn = Number(num=delseqner.sn).validate(inceptive=False).sn # ToDo XXXX need to replace Seqners with Numbers @@ -2949,8 +2943,7 @@ def escrowDelegableEvent(self, serder, sigers, wigers=None, local=True): self.db.delegables.add(snKey(serder.preb, serder.sn), serder.saidb) # log escrowed logger.debug("Kever state: escrowed delegable event=\n%s\n", - json.dumps(serder.ked, indent=1)) - + json.dumps(serder.ked, indent=1)) def escrowPSEvent(self, serder, sigers, wigers=None, local=True): """ @@ -2985,9 +2978,9 @@ def escrowPSEvent(self, serder, sigers, wigers=None, local=True): self.db.esrs.put(keys=dgkey, val=esr) snkey = snKey(serder.preb, serder.sn) - self.db.addPse(snkey, serder.saidb) # b'EOWwyMU3XA7RtWdelFt-6waurOTH_aW_Z9VTaU-CshGk.00000000000000000000000000000001' + self.db.addPse(snkey, serder.saidb) logger.debug("Kever state: Escrowed partially signed or delegated " - "event = %s\n", serder.ked) + "event = %s\n", serder.ked) def escrowPACouple(self, serder, seqner, saider, local=True): @@ -4087,6 +4080,7 @@ def registerReplyRoutes(self, router): router.addRoute("/end/role/{action}", self, suffix="EndRole") router.addRoute("/loc/scheme", self, suffix="LocScheme") router.addRoute("/ksn/{aid}", self, suffix="KeyStateNotice") + router.addRoute("/watcher/{aid}/{action}", self, suffix="AddWatched") def processReplyEndRole(self, *, serder, saider, route, cigars=None, tsgs=None, **kwargs): """ @@ -4406,7 +4400,7 @@ def processReplyKeyStateNotice(self, *, serder, saider, route, ksaider = coring.Saider(qb64=diger.qb64) self.updateKeyState(aid=aid, ksr=ksr, saider=ksaider, dater=dater) - self.cues.push(dict(kin="keyStateSaved", ksn=ksr._asdict())) + self.cues.push(dict(kin="keyStateSaved", ksn=asdict(ksr))) def updateEnd(self, keys, saider, allowed=None): """ @@ -4470,6 +4464,100 @@ def removeKeyState(self, saider): self.db.ksns.rem(keys=keys) self.db.kdts.rem(keys=keys) + def processReplyAddWatched(self, *, serder, saider, route, + cigars=None, tsgs=None, **kwargs): + """ Process one reply message for adding an AID for a watcher to watch + + Process one reply message for adding an AID for a watcher to watch = /watcher/{aid}/add + with either attached nontrans receipt couples in cigars or attached trans + indexed sig groups in tsgs. + Assumes already validated saider, dater, and route from serder.ked + + Parameters: + serder (SerderKERI): instance of reply msg (SAD) + saider (Saider): instance from said in serder (SAD) + route (str): reply route + cigars (list): of Cigar instances that contain nontrans signing couple + signature in .raw and public key in .verfer + tsgs (list): tuples (quadruples) of form + (prefixer, seqner, diger, [sigers]) where: + prefixer is pre of trans endorser + seqner is sequence number of trans endorser's est evt for keys for sigs + diger is digest of trans endorser's est evt for keys for sigs + [sigers] is list of indexed sigs from trans endorser's keys from est evt + + Reply Message: + { + "v" : "KERI10JSON00011c_", + "t" : "rpy", + "d": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + "dt": "2020-08-22T17:50:12.988921+00:00", + "r" : "/watcher/BrHLayDN-mXKv62DAjFLX1_Y5yEUe0vA9YPe_ihiKYHE/add", + "a" : + { + "cid": "EyX-zd8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM" + "oid": "EM0-i05TNZJZAoH3UR2nmLaU6JwyvPzhzS6YAfSVbMC5" + "oobi": "http://example.com/oobi/EyX-zd8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM" + } + } + + """ + aid = kwargs["aid"] + action = kwargs["action"] + # reply specific logic + if not route.startswith("/watcher"): + raise ValidationError(f"Usupported route={route} in {Ilks.rpy} " + f"msg={serder.ked}.") + + # reply specific logic + if action == "add": + enabled = True + elif action == "cut": + enabled = False + else: # unsupported route + raise ValidationError(f"Usupported route={route} in {Ilks.rpy} " + f"msg={serder.ked}.") + route = f"/watcher/{aid}" # escrow based on route base + cigars = cigars if cigars is not None else [] + tsgs = tsgs if tsgs is not None else [] + + data = serder.ked["a"] + cid = data["cid"] + oid = data["oid"] + oobi = data["oobi"] if "oobi" in data else None + + keys = (cid, aid, oid) + + osaider = self.db.wwas.get(keys=keys) # get old said if any + + # BADA Logic + accepted = self.rvy.acceptReply(serder=serder, saider=saider, route=route, + aid=cid, osaider=osaider, cigars=cigars, + tsgs=tsgs) + if not accepted: + raise UnverifiedReplyError(f"Unverified watcher add reply. {serder.ked}") + + if oobi: + self.db.oobis.pin(keys=(oobi,), val=OobiRecord(date=help.nowIso8601())) + self.updateWatched(keys=keys, saider=saider, enabled=enabled) + + def updateWatched(self, keys, saider, enabled=None): + """ + Update loc auth database .lans and loc database .locs. + + Parameters: + keys (tuple): of key strs for databases (eid, scheme) + saider (Saider): instance from said in reply serder (SAD) + enabled (bool): True means add observed to watcher, False means remove (cut) + """ + self.db.wwas.pin(keys=keys, val=saider) # overwrite + if observed := self.db.obvs.get(keys=keys): # preexisting record + observed.enabled = enabled # update preexisting record + else: # no preexisting record + observed = basing.ObservedRecord(enabled=enabled, datetime=helping.nowIso8601()) # create new record + + self.db.obvs.pin(keys=keys, val=observed) # overwrite + def processQuery(self, serder, source=None, sigers=None, cigars=None): """ Process query mode replay message for collective or single element query. @@ -5175,6 +5263,14 @@ def processEscrowPartialSigs(self): raise ValidationError("Missing escrowed evt sigs at " "dig = {}.".format(bytes(edig))) + wigs = self.db.getWigs(dgKey(pre, bytes(edig))) # list of wigs + if not wigs: # empty list + # wigs maybe empty while waiting for first witness signature + # which may not arrive until some time after event is fully signed + # so just log for debugging but do not unescrow by raising + # ValidationError + logger.debug("Kevery unescrow wigs: No event wigs yet at." + "dig = %s", bytes(edig)) # seal source (delegator issuer if any) delseqner = delsaider = None @@ -5197,7 +5293,8 @@ def processEscrowPartialSigs(self): # process event sigers = [Siger(qb64b=bytes(sig)) for sig in sigs] - self.processEvent(serder=eserder, sigers=sigers, + wigers = [Siger(qb64b=bytes(wig)) for wig in wigs] + self.processEvent(serder=eserder, sigers=sigers, wigers=wigers, delseqner=delseqner, delsaider=delsaider, local=esr.local) # If process does NOT validate sigs or delegation seal (when delegated), diff --git a/src/keri/core/routing.py b/src/keri/core/routing.py index 8d119a27b..3601529f0 100644 --- a/src/keri/core/routing.py +++ b/src/keri/core/routing.py @@ -198,7 +198,6 @@ def processReply(self, serder, cigars=None, tsgs=None): self.rtr.dispatch(serder=serder, saider=saider, cigars=cigars, tsgs=tsgs) - def acceptReply(self, serder, saider, route, aid, osaider=None, cigars=None, tsgs=None): """ Applies Best Available Data Acceptance policy to reply and signatures @@ -494,8 +493,6 @@ def processEscrowReply(self): # still waiting on missing prior event to validate if logger.isEnabledFor(logging.DEBUG): logger.exception("Kevery unescrow attempt failed: %s", ex.args[0]) - else: - logger.error("Kevery unescrow attempt failed: %s", ex.args[0]) except Exception as ex: # other error so remove from reply escrow self.db.rpes.rem(keys=(route, ), val=saider) # remove escrow only diff --git a/src/keri/db/basing.py b/src/keri/db/basing.py index cf7fbd30d..601e77d35 100644 --- a/src/keri/db/basing.py +++ b/src/keri/db/basing.py @@ -347,7 +347,6 @@ class OobiRecord: urls: list = None - @dataclass class EndpointRecord: # baser.ends """ @@ -499,6 +498,56 @@ def __iter__(self): return iter(asdict(self)) +@dataclass +class ObservedRecord: # baser.obvs + """ + Watched Record with fields and keys to manage OIDs (Observed IDs) being watched by a watcher, keyed by + cid (controller ID), aid (watcher ID), and oid (observed ID). + + The namespace is a tree of branches with each leaf at a + specific (cid, aid, oid). Retrieval by branch returns groups of leaves as + appropriate for a cid braanch or cid.aid branch. + Database Keys are (cid, aid, oid) where cid is attributable controller identifier + (qb64 prefix). + + Attributes: + enabled (bool): AuthZ via expose message + True means oid is enabled as being observed + False means eid is disenabled being observed + None means eid is neither enabled or disenabled + name (str): user friendly name for eid in role + datetime (str): Date time this record was last observed + + + A watcher end reply message is required from which the field values + for this record are extracted. A routes of /watcher/{aid}/add /watcher/{aid}/cut + Uses add-cut model with allowed field + enabled==True oid is allowed (add) as being observed + enabled==False oid is disallowed (cut) as being observed + + { + "v" : "KERI10JSON00011c_", + "t" : "rpy", + "d": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + "dt": "2020-08-22T17:50:12.988921+00:00", + "r" : "/watcher/BrHLayDN-mXKv62DAjFLX1_Y5yEUe0vA9YPe_ihiKYHE/add", + "a" : + { + "cid": "EaU6JR2nmwyZ-i0d8JZAoTNZH3ULvYAfSVPzhzS6b5CM", + "oid": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + "oobi": "http://example.com/oobi/EyX-zd8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + } + } + + """ + enabled: bool = None # True eid enabled (add), False eid disenabled (cut), None neither + name: str = "" # optional user friendly name of endpoint + datetime: str = None + + def __iter__(self): + return iter(asdict(self)) + + @dataclass class WellKnownAuthN: """ @@ -1015,6 +1064,11 @@ def reopen(self, **kwa): self.locs = koming.Komer(db=self, subkey='locs.', schema=LocationRecord, ) + # observed oids by watcher by cid.aid.oid (endpoint identifier) + # data extracted from reply loc + self.obvs = koming.Komer(db=self, + subkey='obvs.', + schema=ObservedRecord, ) # index of last retrieved message from witness mailbox # TODO: clean @@ -1095,6 +1149,11 @@ def reopen(self, **kwa): # TODO: clean self.knas = subing.CesrSuber(db=self, subkey='knas.', klas=coring.Saider) + # Watcher watched SAID database for successfully saved watched AIDs for a watcher + # maps key=(cid, aid, oid) to val=said of rpy message + # TODO: clean + self.wwas = subing.CesrSuber(db=self, subkey='wwas.', klas=coring.Saider) + # config loaded oobis to be processed asynchronously, keyed by oobi URL # TODO: clean self.oobis = koming.Komer(db=self, @@ -1182,6 +1241,9 @@ def reopen(self, **kwa): # delegated unanchored escrow self.dune = subing.SerderSuber(db=self, subkey='dune.') + # delegate publication escrow for sending delegator info to my witnesses + self.dpub = subing.SerderSuber(db=self, subkey='dpub.') + # completed group delegated AIDs # TODO: clean self.cdel = subing.CesrSuber(db=self, subkey='cdel.', @@ -1540,7 +1602,6 @@ def cloneEvtMsg(self, pre, fn, dig): msg.extend(atc) return msg - def cloneDelegation(self, kever): """ Recursively clone delegation chain from AID of Kever if one exits. @@ -1556,7 +1617,6 @@ def cloneDelegation(self, kever): for dmsg in self.clonePreIter(pre=kever.delpre, fn=0): yield dmsg - def findAnchoringSealEvent(self, pre, seal, sn=0): """ Search through a KEL for the event that contains a specific anchored diff --git a/tests/app/test_storing.py b/tests/app/test_storing.py index 82bf39352..d37eeae3c 100644 --- a/tests/app/test_storing.py +++ b/tests/app/test_storing.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- """ -tests.peer.mailboxing +tests.app.storing """ import os diff --git a/tests/app/test_watching.py b/tests/app/test_watching.py new file mode 100644 index 000000000..4c7a49702 --- /dev/null +++ b/tests/app/test_watching.py @@ -0,0 +1,159 @@ +# -*- encoding: utf-8 -*- +""" +tests.app.watching + +""" +from dataclasses import asdict + +import pytest + +from keri import core +from keri.app import watching, habbing +from keri.app.watching import DiffState +from keri.core import coring +from keri.db.basing import KeyStateRecord, ObservedRecord + + +def test_diffstate(): + d0 = {'vn': [1, 0], + 'i': 'EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM', + 's': '0', + 'p': 'ElsHFkbZQjRb7xHnuE-wyiarIZ9j-1CEQ89I0E3WevcE', + 'd': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'f': '0', + 'dt': '2021-06-09T17:35:54.169967+00:00', + 'et': '2021-06-09T17:35:54.169967+00:00', + 'kt': '1', + 'k': ["D-HwiqmaETxls3vAVSh0xpXYTs94NUJX6juupWj_EgsA"], + 'nt': '1', + 'n': ["ED6lKZwg-BWl_jlCrjosQkOEhqKD4BJnlqYqWmhqPhaU"], + 'bt': '0', + 'b': [], + 'c': [], + 'ee': { + 's': '0', + 'd': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'br': [], + 'ba': [] + }, + 'di': ''} + + ksr0 = KeyStateRecord(**d0) + d1 = {'vn': [1, 0], + 'i': 'EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM', + 's': '0', + 'p': 'ElsHFkbZQjRb7xHnuE-wyiarIZ9j-1CEQ89I0E3WevcE', + 'd': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc', + 'f': '0', + 'dt': '2021-06-09T17:35:54.169967+00:00', + 'et': '2021-06-09T17:35:54.169967+00:00', + 'kt': '1', + 'k': ["DxVTxls3vAwiqmaEXYTs94NUJX6juVSh0xpupEgsAWj_"], + 'nt': '1', + 'n': ["ED6lKZwg-BWl_jlCrjosQkOEhqKD4BJnlqYqWmhqPhaU"], + 'bt': '0', + 'b': [], + 'c': [], + 'ee': { + 's': '0', + 'd': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'br': [], + 'ba': [] + }, + 'di': ''} + ksr1 = KeyStateRecord(**d1) + + wat = "BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s" + diffstate = watching.diffState(wat, ksr0, ksr1) + + # Sequence numbers are the same, digest different == duplicitous + assert asdict(diffstate) == {'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + 'state': 'duplicitous', + 'sn': 0, 'dig': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc'} + + # Same state == event + diffstate = watching.diffState(wat, ksr0, ksr0) + assert asdict(diffstate) == {'dig': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'sn': 0, + 'state': 'even', + 'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'} + + ksr1.s = "2" + diffstate = watching.diffState(wat, ksr0, ksr1) + + # Sequence numbers are the same, digest different == duplicitous + assert asdict(diffstate) == {'dig': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc', + 'sn': 2, + 'state': 'ahead', + 'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'} + + ksr0.s = "3" + diffstate = watching.diffState(wat, ksr0, ksr1) + + # Sequence numbers are the same, digest different == duplicitous + assert asdict(diffstate) == {'dig': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc', + 'sn': 2, + 'state': 'behind', + 'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'} + + +def test_adjudicator(): + default_salt = core.Salter(raw=b'0123456789abcdef').qb64 + with habbing.openHby(name="test", base="test", salt=default_salt) as hby: + hab = hby.makeHab("test") + assert hab.pre == "EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3" + wat = "BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s" + saider = coring.Saider(qb64b=b'EClqKVJREM3MWKBqR2j712s3Z6rPxhqO-h-p8Ls6_9hQ') + + ksr = hab.kever.state() + ksr0 = KeyStateRecord(**asdict(ksr)) + + hab.db.knas.pin(keys=(hab.pre, wat), val=saider) + hab.db.ksns.pin(keys=(saider.qb64, ), val=ksr0) + hab.db.obvs.pin(keys=(hab.pre, wat, hab.pre), val=ObservedRecord(enabled=True)) + + adj = watching.Adjudicator(hby=hby, hab=hab) + + adj.adjudicate(hab.pre, 1) + assert len(adj.cues) == 1 + cue = adj.cues.pull() + + assert cue == {'cid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'kin': 'keyStateConsistent', + 'oid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'states': [DiffState(wit='BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + state='even', + sn=0, + dig='EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3')], + 'wids': {'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'}} + + hab.rotate() + + adj.adjudicate(hab.pre, 1) + assert len(adj.cues) == 1 + cue = adj.cues.pull() + assert cue == {'behind': [DiffState(wit='BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + state='behind', + sn=0, + dig='EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3')], + 'cid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'kin': 'keyStateLagging', + 'oid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'wids': {'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'}} + + ksr0.s = '1' + hab.db.ksns.pin(keys=(saider.qb64, ), val=ksr0) + adj.adjudicate(hab.pre, 1) + assert len(adj.cues) == 1 + cue = adj.cues.pull() + assert cue == {'cid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'dups': [DiffState(wit='BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + state='duplicitous', + sn=1, + dig='EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3')], + 'kin': 'keyStateDuplicitous', + 'oid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'wids': {'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'}} + + with pytest.raises(ValueError): + adj.adjudicate(hab.pre, 2) diff --git a/tests/core/test_reply.py b/tests/core/test_reply.py index 4705065eb..383ab74ec 100644 --- a/tests/core/test_reply.py +++ b/tests/core/test_reply.py @@ -22,8 +22,7 @@ from keri.db import basing from keri.app import habbing, keeping - - +from keri.kering import Roles logger = help.ogler.getLogger() @@ -1283,5 +1282,165 @@ def test_reply(mockHelpingNowUTC): """Done Test""" +def test_watcher_add_cut(): + salt = core.Salter(raw=b'abcdef0123456789').qb64 + + with habbing.openHby(name="con", base="test", salt=salt) as conHby, \ + habbing.openHby(name="wat0", base="test", salt=salt) as wat0hby, \ + habbing.openHby(name="wat1", base="test", salt=salt) as wat1hby, \ + habbing.openHby(name="wat2", base="test", salt=salt) as wat2hby, \ + habbing.openHby(name="obv0", base="test", salt=salt) as obv0hby, \ + habbing.openHby(name="obv1", base="test", salt=salt) as obv1hby, \ + habbing.openHby(name="obv2", base="test", salt=salt) as obv2hby: + + conHab = conHby.makeHab(name="con", isith="1", icount=1, transferable=True) + assert conHab.kever.prefixer.transferable + conKvy = eventing.Kevery(db=conHab.db, lax=False, local=False) + + wat0hab = wat0hby.makeHab(name='wat0', isith="1", icount=1, transferable=False) + assert not wat0hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + wat0kvy = eventing.Kevery(db=wat0hab.db, lax=False, local=False) + + wat1hab = wat1hby.makeHab(name='wat1', isith="1", icount=1, transferable=False) + assert not wat1hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + wat1kvy = eventing.Kevery(db=wat1hab.db, lax=False, local=False) + + wat2hab = wat2hby.makeHab(name='wat2', isith="1", icount=1, transferable=False) + assert not wat2hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + wat2kvy = eventing.Kevery(db=wat2hab.db, lax=False, local=False) + + obv0hab = obv0hby.makeHab(name='obv0', isith="1", icount=1, transferable=True) + assert obv0hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + obv0kvy = eventing.Kevery(db=obv0hab.db, lax=False, local=False) + + obv1hab = obv1hby.makeHab(name='obv1', isith="1", icount=1, transferable=True) + assert obv1hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + obv1kvy = eventing.Kevery(db=obv1hab.db, lax=False, local=False) + + obv2hab = obv2hby.makeHab(name='obv2', isith="1", icount=1, transferable=True) + assert obv2hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + obv2kvy = eventing.Kevery(db=obv2hab.db, lax=False, local=False) + + for hab in [wat0hab, wat1hab, wat2hab, obv0hab, obv1hab, obv2hab]: + msg = hab.makeOwnInception() + parsing.Parser().parseOne(ims=msg, kvy=conKvy) + + conIcp = conHab.makeOwnInception() + for kvy in [wat0kvy, wat1kvy, wat2kvy, obv0kvy, obv1kvy, obv2kvy]: + parsing.Parser().parseOne(ims=bytes(conIcp), kvy=kvy) # make copy so we don't clobber it + + assert wat0hab.pre in conHab.kevers + assert wat1hab.pre in conHab.kevers + assert wat2hab.pre in conHab.kevers + assert obv0hab.pre in conHab.kevers + assert obv1hab.pre in conHab.kevers + assert obv2hab.pre in conHab.kevers + + data = dict(cid=conHab.pre, role=Roles.watcher, eid=wat0hab.pre) + rpy = conHab.reply(route="/end/role/add", data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + ender = conHab.db.ends.get(keys=(conHab.pre, Roles.watcher, wat0hab.pre)) + assert ender.allowed is True + ender = wat0hab.db.ends.get(keys=(conHab.pre, Roles.watcher, wat0hab.pre)) + assert ender.allowed is True + + for hab in [obv0hab, obv1hab, obv2hab]: + icp = hab.makeOwnInception() + conHab.psr.parseOne(ims=bytes(icp)) + wat0hab.psr.parseOne(ims=bytes(icp)) + wat1hab.psr.parseOne(ims=bytes(icp)) + wat2hab.psr.parseOne(ims=bytes(icp)) + + assert obv0hab.pre in conHab.kevers + assert obv1hab.pre in conHab.kevers + assert obv2hab.pre in conHab.kevers + assert obv2hab.pre in wat0hab.kevers + + route = f"/watcher/{wat0hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv0hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv0hab.pre)) + assert observed.enabled is True + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv0hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat0hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv1hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is True + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat0hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv2hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv2hab.pre)) + assert observed.enabled is True + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv2hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat0hab.pre}/cut" + data = dict(cid=conHab.pre, oid=obv1hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is False + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is False + + route = f"/watcher/{wat1hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv0hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat1hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv0hab.pre)) + assert observed.enabled is True + observed = wat1hab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv0hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat1hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv2hab.pre, oobi=f"http://example.com/oobi/{obv2hab.pre}") + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat1hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv0hab.pre)) + assert observed.enabled is True + assert conHab.db.oobis.get(keys=(f"http://example.com/oobi/{obv2hab.pre}",)) is not None + observed = wat1hab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv2hab.pre)) + assert observed.enabled is True + observed = wat1hab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv1hab.pre)) + assert observed is None + + # Make sure nothing was changed for Wat0 + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv0hab.pre)) + assert observed.enabled is True + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is False + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv2hab.pre)) + assert observed.enabled is True + + if __name__ == "__main__": pytest.main(['-vv', 'test_reply.py::test_reply'])