diff --git a/src/keri/app/agenting.py b/src/keri/app/agenting.py index ad164a9b3..456e3f4bc 100644 --- a/src/keri/app/agenting.py +++ b/src/keri/app/agenting.py @@ -581,6 +581,7 @@ def __init__(self, hby, msgs=None, cues=None, **kwa): """ self.hby = hby + self.posted = 0 self.msgs = msgs if msgs is not None else decking.Deck() self.cues = cues if cues is not None else decking.Deck() super(WitnessPublisher, self).__init__(doers=[doing.doify(self.sendDo)], **kwa) @@ -599,6 +600,7 @@ def sendDo(self, tymth=None, tock=0.0, **opts): while True: while self.msgs: evt = self.msgs.popleft() + self.posted += 1 pre = evt["pre"] msg = evt["msg"] @@ -642,6 +644,10 @@ def sent(self, said): return False + @property + def idle(self): + return len(self.msgs) == 0 and self.posted == len(self.cues) + class TCPMessenger(doing.DoDoer): """ Send events to witnesses for receipting using TCP direct connection diff --git a/src/keri/app/cli/commands/delegate/confirm.py b/src/keri/app/cli/commands/delegate/confirm.py index a0ddb7525..25eb27764 100644 --- a/src/keri/app/cli/commands/delegate/confirm.py +++ b/src/keri/app/cli/commands/delegate/confirm.py @@ -37,6 +37,7 @@ action='store_true') parser.add_argument('--code', help=': formatted witness auth codes. Can appear multiple times', default=[], action="append", required=False) +parser.add_argument('--code-time', help='Time the witness codes were captured.', default=None, required=False) def confirm(args): @@ -54,16 +55,18 @@ def confirm(args): auto = args.auto authenticate = args.authenticate codes = args.code + codeTime = args.code_time confirmDoer = ConfirmDoer(name=name, base=base, alias=alias, bran=bran, interact=interact, auto=auto, - authenticate=authenticate, codes=codes) + authenticate=authenticate, codes=codes, codeTime=codeTime) doers = [confirmDoer] return doers class ConfirmDoer(doing.DoDoer): - def __init__(self, name, base, alias, bran, interact=False, auto=False, authenticate=False, codes=None): + def __init__(self, name, base, alias, bran, interact=False, auto=False, authenticate=False, codes=None, + codeTime=None): hby = existing.setupHby(name=name, base=base, bran=bran) self.hbyDoer = habbing.HaberyDoer(habery=hby) # setup doer self.witq = agenting.WitnessInquisitor(hby=hby) @@ -73,6 +76,7 @@ def __init__(self, name, base, alias, bran, interact=False, auto=False, authenti self.mux = grouping.Multiplexor(hby=hby, notifier=self.notifier) self.authenticate = authenticate self.codes = codes if codes is not None else [] + self.codeTime = codeTime exc = exchanging.Exchanger(hby=hby, handlers=[]) delegating.loadHandlers(hby=hby, exc=exc, notifier=self.notifier) @@ -185,9 +189,11 @@ def confirmDo(self, tymth, tock=0.0): auths = {} if self.authenticate: + codeTime = helping.fromIso8601( + self.codeTime) if self.codeTime is not None else helping.nowIso8601() for arg in self.codes: (wit, code) = arg.split(":") - auths[wit] = f"{code}#{helping.nowIso8601()}" + auths[wit] = f"{code}#{codeTime}" for wit in hab.kever.wits: if wit in auths: diff --git a/src/keri/app/cli/commands/local/watch.py b/src/keri/app/cli/commands/local/watch.py index 9e17841a7..b93aeba3c 100644 --- a/src/keri/app/cli/commands/local/watch.py +++ b/src/keri/app/cli/commands/local/watch.py @@ -7,14 +7,13 @@ import random import sys import time -from collections import namedtuple from hio import help from hio.base import doing from keri.app import agenting, indirecting, habbing, forwarding from keri.app.cli.common import existing, terming from keri.app.habbing import GroupHab -from keri.core import coring +from keri.app.watching import States, diffState logger = help.ogler.getLogger() @@ -31,17 +30,6 @@ parser.add_argument('--aeid', help='qualified base64 of non-transferable identifier prefix for authentication ' 'and encryption of secrets in keystore', default=None) -Stateage = namedtuple("Stateage", 'even ahead behind duplicitous') - -States = Stateage(even="even", ahead="ahead", behind="behind", duplicitous="duplicitous") - - -class WitnessState: - wit: str - state: Stateage - sn: int - dig: str - def watch(args): name = args.name @@ -135,7 +123,7 @@ def watchDo(self, tymth, tock=0.0, **opts): mystate = hab.kever.state() witstate = hab.db.ksns.get((saider.qb64,)) - states.append(self.diffState(wit, mystate, witstate)) + states.append(diffState(wit, mystate, witstate)) # First check for any duplicity, if so get out of here dups = [state for state in states if state.state == States.duplicitous] @@ -213,31 +201,3 @@ def cueDo(self, tymth, tock=0.0, **opts): yield self.tock yield self.tock - - @staticmethod - def diffState(wit, preksn, witksn): - - witstate = WitnessState() - witstate.wit = wit - mysn = int(preksn.s, 16) - mydig = preksn.d - witstate.sn = int(witksn.f, 16) - witstate.dig = witksn.d - - # At the same sequence number, check the DIGs - if mysn == witstate.sn: - if mydig == witstate.dig: - witstate.state = States.even - else: - witstate.state = States.duplicitous - - # This witness is behind and will need to be caught up. - elif mysn > witstate.sn: - witstate.state = States.behind - - # mysn < witstate.sn - We are behind this witness (multisig or restore situation). - # Must ensure that controller approves this event or a recovery rotation is needed - else: - witstate.state = States.ahead - - return witstate diff --git a/src/keri/app/cli/commands/mailbox/debug.py b/src/keri/app/cli/commands/mailbox/debug.py index d305a1ad1..873c9ad05 100644 --- a/src/keri/app/cli/commands/mailbox/debug.py +++ b/src/keri/app/cli/commands/mailbox/debug.py @@ -85,7 +85,7 @@ def readDo(self, tymth, tock=0.0): hab = self.hby.habByName(name=self.alias) topics = {"/receipt": 0, "/replay": 0, "/multisig": 0, "/credential": 0, "/delegate": 0, "/challenge": 0, - "/oobi": 0} + "/oobi": 0, "/reply": 0} try: client, clientDoer = agenting.httpClient(hab, self.witness) except kering.MissingEntryError as e: @@ -95,8 +95,11 @@ def readDo(self, tymth, tock=0.0): print("Local Index per Topic") witrec = hab.db.tops.get((hab.pre, self.witness)) - for topic in witrec.topics: - print(f" Topic {topic}: {witrec.topics[topic]}") + if witrec: + for topic in witrec.topics: + print(f" Topic {topic}: {witrec.topics[topic]}") + else: + print("\tNo local index") print() q = dict(pre=hab.pre, topics=topics) @@ -107,7 +110,7 @@ def readDo(self, tymth, tock=0.0): httping.createCESRRequest(msg, client, dest=self.witness) - while client.requests: + while client.requests or (not client.events and not client.requests): yield self.tock yield 1.0 diff --git a/src/keri/app/cli/commands/mailbox/list.py b/src/keri/app/cli/commands/mailbox/list.py new file mode 100644 index 000000000..369e749df --- /dev/null +++ b/src/keri/app/cli/commands/mailbox/list.py @@ -0,0 +1,68 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse + +from hio import help +from hio.base import doing + +from keri.app import connecting +from keri.app.cli.common import existing +from keri.kering import ConfigurationError, Roles + +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='List current mailboxes') +parser.set_defaults(handler=lambda args: handle(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--alias', '-a', help='human readable alias for the identifier to whom the credential was issued', + required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--passcode', '-p', help='22 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran + + +def handle(args): + """ Command line handler for adding an aid to a watcher's list of AIds to watch + + Parameters: + args(Namespace): parsed command line arguments + + """ + + kwa = dict(args=args) + return [doing.doify(listMailboxes, **kwa)] + + +def listMailboxes(tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + args = opts["args"] + name = args.name + alias = args.alias + base = args.base + bran = args.bran + + try: + with existing.existingHby(name=name, base=base, bran=bran) as hby: + org = connecting.Organizer(hby=hby) + if alias is None: + alias = existing.aliasInput(hby) + + hab = hby.habByName(alias) + + for (aid, role, eid), ender in hab.db.ends.getItemIter(keys=(hab.pre, Roles.mailbox)): + if ender.allowed: + contact = org.get(eid) + print(f"{contact['alias']}: {eid}") + + except ConfigurationError as e: + print(f"identifier prefix for {name} does not exist, incept must be run first", ) + return -1 diff --git a/src/keri/app/cli/commands/watcher/add.py b/src/keri/app/cli/commands/watcher/add.py index 7f3c8410f..c993f8e90 100644 --- a/src/keri/app/cli/commands/watcher/add.py +++ b/src/keri/app/cli/commands/watcher/add.py @@ -11,7 +11,8 @@ from keri.app import connecting, habbing, forwarding from keri.app.cli.common import existing -from keri.core import eventing, serdering +from keri.core import serdering +from keri.kering import Roles logger = help.ogler.getLogger() @@ -91,7 +92,7 @@ def __init__(self, name, alias, base, bran, watcher, watched): super(AddDoer, self).__init__(doers=doers) def addDo(self, tymth, tock=0.0): - """ Grant credential by creating /ipex/grant exn message + """ Add an AID to a watcher's list of AIDs to watch Parameters: tymth (function): injected function wrapper closure returned by .tymen() of @@ -109,17 +110,28 @@ def addDo(self, tymth, tock=0.0): if isinstance(self.hab, habbing.GroupHab): raise ValueError("watchers for multisig AIDs not currently supported") + ender = self.hab.db.ends.get(keys=(self.hab.pre, Roles.watcher, self.watcher)) + if not ender or not ender.allowed: + msg = self.hab.reply(route="/end/role/add", + data=dict(cid=self.hab.pre, role=Roles.watcher, eid=self.watcher)) + self.hab.psr.parseOne(ims=msg) + postman = forwarding.StreamPoster(hby=self.hby, hab=self.hab, recp=self.watcher, topic="reply") + for msg in self.hab.db.cloneDelegation(self.hab.kever): + serder = serdering.SerderKERI(raw=msg) + postman.send(serder=serder, attachment=msg[serder.size:]) + for msg in self.hab.db.clonePreIter(pre=self.hab.pre): serder = serdering.SerderKERI(raw=msg) postman.send(serder=serder, attachment=msg[serder.size:]) data = dict(cid=self.hab.pre, - wid=self.watched, + oid=self.watched, oobi=self.oobi) - route = "/watcher/aid/add" + route = f"/watcher/{self.watcher}/add" msg = self.hab.reply(route=route, data=data) + self.hab.psr.parseOne(ims=bytes(msg)) rpy = serdering.SerderKERI(raw=msg) postman.send(serder=rpy, attachment=msg[rpy.size:]) diff --git a/src/keri/app/cli/commands/watcher/adjudicate.py b/src/keri/app/cli/commands/watcher/adjudicate.py new file mode 100644 index 000000000..82e368e83 --- /dev/null +++ b/src/keri/app/cli/commands/watcher/adjudicate.py @@ -0,0 +1,159 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse +import datetime +import random +import sys + +from hio import help +from hio.base import doing + +from keri.app import connecting, indirecting, querying, watching +from keri.app.cli.common import existing +from keri.app.watching import diffState, States +from keri.help import helping +from keri.kering import ConfigurationError +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='Perform key event adjudication on any new key state from watchers.') +parser.set_defaults(handler=lambda args: handle(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--alias', '-a', help='human readable alias for the identifier to whom the credential was issued', + required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--passcode', '-p', help='22 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran +parser.add_argument('--toad', '-t', default=None, required=False, type=int, + help='int of watcher threshold (threshold of acceptable duplicity)', ) +parser.add_argument("--watched", '-W', help="the watched AID or alias to add", required=True) +parser.add_argument("--poll", "-P", help="Poll mailboxes for any issued credentials", action="store_true") + + +def handle(args): + """ Command line handler for adding an aid to a watcher's list of AIds to watch + + Parameters: + args(Namespace): parsed command line arguments + + """ + + kwa = dict(args=args) + adjudicator = AdjudicationDoer(**kwa) + + return [adjudicator] + + +class AdjudicationDoer(doing.DoDoer): + + def __init__(self, **kwa): + args = kwa["args"] + base = args.base + bran = args.bran + self.name = args.name + self.alias = args.alias + self.watched = args.watched + self.poll = args.poll + self.toad = args.toad + + self.hby = existing.setupHby(name=self.name, base=base, bran=bran) + self.mbx = indirecting.MailboxDirector(hby=self.hby, topics=['/reply', '/replay']) + doers = [doing.doify(self.adjudicate, **kwa), self.mbx] + + super(AdjudicationDoer, self).__init__(**kwa, doers=doers) + + def adjudicate(self, tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + + try: + org = connecting.Organizer(hby=self.hby) + + if self.poll: + end = helping.nowUTC() + datetime.timedelta(seconds=5) + sys.stdout.write(f"Polling mailboxes") + sys.stdout.flush() + while helping.nowUTC() < end: + sys.stdout.write(".") + sys.stdout.flush() + yield 1.0 + print("\n") + + if self.watched in self.hby.kevers: + watd = self.watched + else: + watd = org.find("alias", self.watched) + if len(watd) != 1: + raise ValueError(f"invalid recipient {self.watched}") + watd = watd[0]['id'] + + if not watd: + raise ValueError(f"unknown watched {self.watched}") + + if self.alias is None: + self.alias = existing.aliasInput(self.hby) + + hab = self.hby.habByName(self.alias) + if hab is None: + raise ValueError(f"unknown alias {self.alias}") + + adj = watching.Adjudicator(hby=self.hby, hab=hab) + adjDoer = watching.AdjudicationDoer(adj) + self.extend([adjDoer]) + + adj.msgs.append(dict(oid=self.watched, toad=self.toad)) + + while not adj.cues: + yield self.tock + + cue = adj.cues.pull() + kin = cue['kin'] + + match kin: + case "keyStateConsistent": + states = cue['states'] + wids = cue["wids"] + print(f"Local key state is consistent with the {len(states)} (out of " + f"{len(wids)} total) watchers that responded") + + case "keyStateLagging": + bhds = cue["behinds"] + print("The following watchers are behind the local KEL:") + for state in bhds: + print(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + print(f"Recommend the checking those watchers for access to {self.watched} witnesses") + + case "keyStateUpdate": + ahds = cue["aheads"] + logger.info(f"Threshold ({self.toad}) satisfying number of watchers ({len(ahds)}) are ahead") + for state in ahds: + logger.info(f"\tWatcher {state.wit} at Seq No. {state.sn} with digest: {state.dig}") + + state = random.choice(ahds) + querier = querying.SeqNoQuerier(hby=self.hby, hab=hab, pre=self.watched, sn=state.sn, + wits=[state.wit]) + self.extend([querier]) + + while not querier.done: + yield self.tock + + case "keyStateDuplicitous": + dups = cue["dups"] + print(f"Duplicity detected for AID {self.watched}, local key state remains intact.") + for state in dups: + print(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + self.remove([self.mbx, adjDoer]) + + except ConfigurationError as e: + print(f"identifier prefix for {self.name} does not exist, incept must be run first", ) + return -1 + diff --git a/src/keri/app/cli/commands/watcher/list.py b/src/keri/app/cli/commands/watcher/list.py new file mode 100644 index 000000000..ff54dc0ff --- /dev/null +++ b/src/keri/app/cli/commands/watcher/list.py @@ -0,0 +1,68 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse + +from hio import help +from hio.base import doing + +from keri.app import connecting +from keri.app.cli.common import existing +from keri.kering import ConfigurationError, Roles + +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='List current watchers') +parser.set_defaults(handler=lambda args: handle(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--alias', '-a', help='human readable alias for the identifier to whom the credential was issued', + required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--passcode', '-p', help='22 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran + + +def handle(args): + """ Command line handler for adding an aid to a watcher's list of AIds to watch + + Parameters: + args(Namespace): parsed command line arguments + + """ + + kwa = dict(args=args) + return [doing.doify(listWatchers, **kwa)] + + +def listWatchers(tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + args = opts["args"] + name = args.name + alias = args.alias + base = args.base + bran = args.bran + + try: + with existing.existingHby(name=name, base=base, bran=bran) as hby: + org = connecting.Organizer(hby=hby) + if alias is None: + alias = existing.aliasInput(hby) + + hab = hby.habByName(alias) + + for (aid, role, eid), ender in hab.db.ends.getItemIter(keys=(hab.pre, Roles.watcher, )): + if ender.allowed: + contact = org.get(eid) + print(f"{contact['alias']}: {eid}") + + except ConfigurationError as e: + print(f"identifier prefix for {name} does not exist, incept must be run first", ) + return -1 diff --git a/src/keri/app/cli/commands/witness/list.py b/src/keri/app/cli/commands/witness/list.py new file mode 100644 index 000000000..9ab11e6e5 --- /dev/null +++ b/src/keri/app/cli/commands/witness/list.py @@ -0,0 +1,56 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.kli.commands module + +""" +import argparse + +from hio import help +from hio.base import doing + +from keri.app.cli.common import displaying, existing +from keri.core import serdering +from keri.kering import ConfigurationError + +logger = help.ogler.getLogger() + +parser = argparse.ArgumentParser(description='List AIDs of witness for the provided AID') +parser.set_defaults(handler=lambda args: handler(args), + transferable=True) +parser.add_argument('--name', '-n', help='keystore name and file location of KERI keystore', required=True) +parser.add_argument('--base', '-b', help='additional optional prefix to file location of KERI keystore', + required=False, default="") +parser.add_argument('--alias', '-a', help='human readable alias for the new identifier prefix', default=None) +parser.add_argument('--passcode', '-p', help='21 character encryption passcode for keystore (is not saved)', + dest="bran", default=None) # passcode => bran + + +def handler(args): + kwa = dict(args=args) + return [doing.doify(listWitnesses, **kwa)] + + +def listWitnesses(tymth, tock=0.0, **opts): + """ Command line status handler + + """ + _ = (yield tock) + args = opts["args"] + name = args.name + alias = args.alias + base = args.base + bran = args.bran + + try: + with existing.existingHby(name=name, base=base, bran=bran) as hby: + if alias is None: + alias = existing.aliasInput(hby) + + hab = hby.habByName(alias) + for idx, wit in enumerate(hab.kever.wits): + print(f'{wit}') + + except ConfigurationError as e: + print(f"identifier prefix for {name} does not exist, incept must be run first", ) + return -1 diff --git a/src/keri/app/delegating.py b/src/keri/app/delegating.py index 8d9e4c194..6b90c53ef 100644 --- a/src/keri/app/delegating.py +++ b/src/keri/app/delegating.py @@ -44,11 +44,11 @@ def __init__(self, hby, proxy=None, auths=None, **kwa): self.postman = forwarding.Poster(hby=hby) self.witq = agenting.WitnessInquisitor(hby=hby) self.witDoer = agenting.Receiptor(hby=self.hby) + self.publishers = dict() self.proxy = proxy self.auths = auths - super(Anchorer, self).__init__(doers=[self.witq, self.witDoer, self.postman, doing.doify(self.escrowDo)], - **kwa) + super(Anchorer, self).__init__(doers=[self.witq, self.witDoer, self.postman, doing.doify(self.escrowDo)], **kwa) def delegation(self, pre, sn=None, proxy=None, auths=None): if pre not in self.hby.habs: @@ -57,6 +57,7 @@ def delegation(self, pre, sn=None, proxy=None, auths=None): if proxy is not None: self.proxy = proxy + self.publishers[pre] = agenting.WitnessPublisher(hby=self.hby) # load the hab of the delegated identifier to anchor hab = self.hby.habs[pre] delpre = hab.kever.delpre # get the delegator identifier @@ -123,6 +124,7 @@ def escrowDo(self, tymth, tock=1.0): def processEscrows(self): self.processPartialWitnessEscrow() self.processUnanchoredEscrow() + self.processWitnessPublication() def processUnanchoredEscrow(self): """ @@ -143,8 +145,9 @@ def processUnanchoredEscrow(self): self.hby.db.setAes(dgkey, couple) # authorizer event seal (delegator/issuer) # Move to escrow waiting for witness receipts - logger.info(f"Delegation approval received, {serder.pre} confirmed") - self.hby.db.cdel.put(keys=(pre, coring.Seqner(sn=serder.sn).qb64), val=coring.Saider(qb64=serder.said)) + logger.info(f"Delegation approval received, {serder.pre} confirmed, publishing to my witnesses") + self.publishDelegator(pre) + self.hby.db.dpub.put(keys=(pre, said), val=serder) self.hby.db.dune.rem(keys=(pre, said)) def processPartialWitnessEscrow(self): @@ -197,6 +200,32 @@ def processPartialWitnessEscrow(self): self.hby.db.dpwe.rem(keys=(pre, said)) self.hby.db.dune.pin(keys=(srdr.pre, srdr.said), val=srdr) + def processWitnessPublication(self): + """ + Process escrow of partially signed multisig group KEL events. Message + processing will send this local controllers signature to all other participants + then this escrow waits for signatures from all other participants + + """ + for (pre, said), serder in self.hby.db.dpub.getItemIter(): # group partial witness escrow + publisher = self.publishers[pre] + + if not publisher.idle: + continue + + self.remove([publisher]) + del self.publishers[pre] + + self.hby.db.dpub.rem(keys=(pre, said)) + self.hby.db.cdel.put(keys=(pre, coring.Seqner(sn=serder.sn).qb64), val=coring.Saider(qb64=serder.said)) + + def publishDelegator(self, pre): + hab = self.hby.habs[pre] + publisher = self.publishers[pre] + self.extend([publisher]) + for msg in hab.db.cloneDelegation(hab.kever): + publisher.msgs.append(dict(pre=hab.pre, msg=bytes(msg))) + def loadHandlers(hby, exc, notifier): """ Load handlers for the peer-to-peer delegation protocols diff --git a/src/keri/app/querying.py b/src/keri/app/querying.py index 2dc4ce17d..11541640f 100644 --- a/src/keri/app/querying.py +++ b/src/keri/app/querying.py @@ -90,13 +90,13 @@ def recur(self, tyme, deeds=None): class SeqNoQuerier(doing.DoDoer): - def __init__(self, hby, hab, pre, sn, **opts): + def __init__(self, hby, hab, pre, sn, wits=None, **opts): self.hby = hby self.hab = hab self.pre = pre self.sn = sn self.witq = agenting.WitnessInquisitor(hby=self.hby) - self.witq.query(src=self.hab.pre, pre=self.pre, sn="{:x}".format(self.sn)) + self.witq.query(src=self.hab.pre, pre=self.pre, sn="{:x}".format(self.sn), wits=wits) super(SeqNoQuerier, self).__init__(doers=[self.witq], **opts) def recur(self, tyme, deeds=None): diff --git a/src/keri/app/watching.py b/src/keri/app/watching.py new file mode 100644 index 000000000..3a720d594 --- /dev/null +++ b/src/keri/app/watching.py @@ -0,0 +1,217 @@ +# -*- encoding: utf-8 -*- +""" +KERI +keri.app.watching module + +""" +import random +from collections import namedtuple +from dataclasses import dataclass + +from hio.base import doing +from hio.help import decking + +from keri import help + +logger = help.ogler.getLogger() + +Stateage = namedtuple("Stateage", 'even ahead behind duplicitous') + +States = Stateage(even="even", ahead="ahead", behind="behind", duplicitous="duplicitous") + + +@dataclass +class DiffState: + """ Difference between a remote KeyStateRecord and local for the same AID. + + Uses Stateage to represent whether the remote KSR is even, ahead, behind or duplicitous + + """ + wit: str # The entity reporting the KSR (non-local) + state: Stateage # The state of the remote KSR relative to local + sn: int # The sequence number of the remote KSR + dig: str # The digest of the latest event of the remote KSR + + +class Adjudicator: + """ The Adjudicator of Key State + + This class performs key state adjudication by checking any key state reported by the watcher set for a given + watched AID and compares the reported values against the local key state for the watched AID and the key state + of all other responding watchers. It uses a per-adjudication threshold to determine what is acceptable duplicity + for each adjudication. + + Cues are sent out for each round of adjudication with the following kins: + + keyStateConsistent - Key state of all queries watchers is consistent with local key state + keyStateLagging - Key state from some watchers is behind local key state and other watchers + keyStateUpdate - A threshold satisfying number of watchers report new key state for watched AID + keyStateDuplicitous - Duplicity has been detected on some set of watchers (provided in the cue) + + Consumers of the Adjudicator's cues are safe to retrieve new key state from one of the Watchers listed in the + cue of `keyStateUpdated` is received. All other kins require controller intervention and should be bubbled up. + + """ + + def __init__(self, hby, hab, msgs=None, cues=None): + """ Create instance of Adjudicator for adjudicating key state + + Parameters: + hby (Habery): database and Habitat environment + hab (Hab): identifier database environment + msgs (Deck): incoming requests to adjudicate key state + cues (Deck): outgoing responses to adjudication of key state + + """ + self.hby = hby + self.hab = hab + self.msgs = msgs if msgs is not None else decking.Deck() + self.cues = cues if cues is not None else decking.Deck() + + def performAdjudications(self): + """ Process loop of existing messages requesting key state adjudication """ + while self.msgs: + msg = self.msgs.pull() + + watched = msg["oid"] + toad = msg["toad"] if "toad" in msg else None + + self.adjudicate(watched, toad) + + def adjudicate(self, watched, toad=None): + """ Perform key state adjudication against the `watched` AID and provided threshold + + If `toad` is not provided, the full set of watchers must come to consensus before `keyStateUpdate` + will be reported. + + Parameters: + watched (str): qb64 AID to adjudicate for key state duplicity + toad (int): threshold of acceptable duplicity amongst available watchers + + + """ + watchers = set() + for (cid, aid, oid), observed in self.hab.db.obvs.getItemIter(keys=(self.hab.pre,)): + if observed.enabled and oid == watched: + watchers.add(aid) + + toad = int(toad) if toad else len(watchers) + if toad > len(watchers): + raise ValueError(f"Threshold of {toad} is greater than number watchers {len(watchers)}") + + states = [] + mystate = self.hab.kever.state() + for watcher in watchers: + saider = self.hab.db.knas.get(keys=(watched, watcher)) + if saider is None: + print(f"No key state from watcher {watcher} for {watched}") + continue + + ksn = self.hab.db.ksns.get(keys=(saider.qb64,)) + states.append(diffState(watcher, mystate, ksn)) + + dups = [state for state in states if state.state == States.duplicitous] + ahds = [state for state in states if state.state == States.ahead] + bhds = [state for state in states if state.state == States.behind] + + if len(dups) > 0: + cue = dict(kin="keyStateDuplicitous", cid=self.hab.pre, oid=watched, wids=watchers, dups=dups) + self.cues.append(cue) + + logger.error(f"Duplicity detected for AID {watched}, local key state remains intact.") + for state in dups: + logger.error(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + elif len(ahds) > 0: + # Only group habs can be behind their watchers + # First check for duplicity among the watchers that are ahead (possible only if toad is below + # super majority) + digs = set([state.dig for state in ahds]) + if len(digs) > 1: # Duplicity across watcher sets + cue = dict(kin="keyStateDuplicitous", cid=self.hab.pre, oid=watched, wids=watchers, dups=ahds) + self.cues.append(cue) + + logger.error(f"There are multiple duplicitous events on watcher for {watched}") + for state in ahds: + logger.error(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + elif len(ahds) >= toad: # all witnesses that are ahead agree on the event + logger.info(f"Threshold ({toad}) satisfying number of watchers ({len(ahds)}) are ahead") + for state in ahds: + logger.info(f"\tWatcher {state.wit} at Seq No. {state.sn} with digest: {state.dig}") + + state = random.choice(ahds) + cue = dict(kin="keyStateUpdate", cid=self.hab.pre, oid=watched, wids=watchers, sn=state.sn, aheads=ahds) + self.cues.append(cue) + + elif len(bhds) > 0: + cue = dict(kin="keyStateLagging", cid=self.hab.pre, oid=watched, wids=watchers, behind=bhds) + self.cues.append(cue) + + logger.info("The following watchers are behind the local KEL:") + for state in bhds: + logger.info(f"\tWatcher {state.wit} at seq No. {state.sn} with digest: {state.dig}") + + logger.info(f"Recommend the checking those watchers for access to {watched} witnesses") + + else: + cue = dict(kin="keyStateConsistent", cid=self.hab.pre, oid=watched, wids=watchers, states=states) + self.cues.append(cue) + logger.info(f"Local key state is consistent with the {len(states)} (out of " + f"{len(watchers)} total) watchers that responded") + + +class AdjudicationDoer(doing.Doer): + """ Doer class responsible for process adjudication requests in an Adjudicator's msgs """ + + def __init__(self, adjudicator): + """ Create instance of Doer for performing key state adjudications """ + self.adjudicator = adjudicator + super(AdjudicationDoer, self).__init__() + + def recur(self, tyme): + """ Perform one pass over all adjudication requests + + Parameters: + tyme (float): relative cycle time + + Returns: + + """ + self.adjudicator.performAdjudications() + + +def diffState(wit, preksn, witksn): + """ Return a record of the differences between the states provided by `wit` and local state + + Parameters: + wit (str): qb64 AID of entity reporting key state + preksn (KeyStateRecord): Local key state of AID + witksn (KeyStateRecord): Key state of AID as provided by `wit` + + Returns: + state (WitnessState): record indicating the differenced between the two provided KSN records + + """ + mysn = int(preksn.s, 16) + mydig = preksn.d + sn = int(witksn.s, 16) + dig = witksn.d + + # At the same sequence number, check the DIGs + if mysn == sn: + if mydig == dig: + state = States.even + else: + state = States.duplicitous + + # This witness is behind and will need to be caught up. + elif mysn > sn: + state = States.behind + + # mysn < witstate.sn - We are behind this witness (multisig or restore situation). + # Must ensure that controller approves this event or a recovery rotation is needed + else: + state = States.ahead + + return DiffState(wit, state, sn, dig) diff --git a/src/keri/core/eventing.py b/src/keri/core/eventing.py index dd9cb2244..7b552b072 100644 --- a/src/keri/core/eventing.py +++ b/src/keri/core/eventing.py @@ -7,6 +7,7 @@ import json import logging from collections import namedtuple +from dataclasses import asdict from urllib.parse import urlsplit from math import ceil from ordered_set import OrderedSet as oset @@ -39,7 +40,7 @@ from . import serdering from ..db import basing, dbing -from ..db.basing import KeyStateRecord, StateEERecord +from ..db.basing import KeyStateRecord, StateEERecord, OobiRecord from ..db.dbing import dgKey, snKey, fnKey, splitKeySN, splitKey @@ -2098,8 +2099,6 @@ def rotate(self, serder): raise ValidationError(f"Invalid sith = {serder.tholder} for keys = " f"{keys} for evt = {ked}.") - - # compute wits from existing .wits with new cuts and adds from event # use ordered set math ops to verify and ensure strict ordering of wits # cuts and add to ensure that indexed signatures on indexed witness @@ -2118,7 +2117,6 @@ def rotate(self, serder): return tholder, toader, wits, cuts, adds - def deriveBacks(self, serder): """Derives and return tuple of (wits, cuts, adds) for backers given current set and any changes provided by serder. @@ -2243,10 +2241,10 @@ def valSigsWigsDel(self, serder, sigers, verfers, tholder, (self.locallyOwned() or self.locallyWitnessed(wits=wits))): self.escrowMFEvent(serder=serder, sigers=sigers, wigers=wigers, - seqner=delseqner, saider=delsaider, local=local) + seqner=delseqner, saider=delsaider, local=local) raise MisfitEventSourceError(f"Nonlocal source for locally owned" - f"or locally witnessed event" - f" = {serder.ked}.") + f" or locally witnessed event" + f" = {serder.ked}, {wits}, {self.prefixes}") werfers = [Verfer(qb64=wit) for wit in wits] # get witness public key verifiers # get unique verified wigers and windices lists from wigers list @@ -2627,19 +2625,15 @@ def validateDelegation(self, serder, sigers, wigers, wits, local=True, # misfit escrow first. Mistfit escrow must first # promote to local and reprocess event before we get to here self.escrowDelegableEvent(serder=serder, sigers=sigers, - wigers=wigers,local=local) + wigers=wigers, local=local) raise MissingDelegableApprovalError(f"Missing approval for " f" delegation by {delpre} of" - f"event = {serder.ked}.") - - #self.cues.push(dict(kin="approveDelegation", - #delegator=kever.delpre, - #serder=serder)) + f"event = {serder.ked}.") else: # not local delegator so escrow self.escrowPSEvent(serder=serder, sigers=sigers, wigers=wigers, local=local) raise MissingDelegationError(f"No delegation seal for delegator " - "{delpre} of evt = {serder.ked}.") + f"{delpre} of evt = {serder.ked}.") ssn = Number(num=delseqner.sn).validate(inceptive=False).sn # ToDo XXXX need to replace Seqners with Numbers @@ -2949,8 +2943,7 @@ def escrowDelegableEvent(self, serder, sigers, wigers=None, local=True): self.db.delegables.add(snKey(serder.preb, serder.sn), serder.saidb) # log escrowed logger.debug("Kever state: escrowed delegable event=\n%s\n", - json.dumps(serder.ked, indent=1)) - + json.dumps(serder.ked, indent=1)) def escrowPSEvent(self, serder, sigers, wigers=None, local=True): """ @@ -2985,9 +2978,9 @@ def escrowPSEvent(self, serder, sigers, wigers=None, local=True): self.db.esrs.put(keys=dgkey, val=esr) snkey = snKey(serder.preb, serder.sn) - self.db.addPse(snkey, serder.saidb) # b'EOWwyMU3XA7RtWdelFt-6waurOTH_aW_Z9VTaU-CshGk.00000000000000000000000000000001' + self.db.addPse(snkey, serder.saidb) logger.debug("Kever state: Escrowed partially signed or delegated " - "event = %s\n", serder.ked) + "event = %s\n", serder.ked) def escrowPACouple(self, serder, seqner, saider, local=True): @@ -4087,6 +4080,7 @@ def registerReplyRoutes(self, router): router.addRoute("/end/role/{action}", self, suffix="EndRole") router.addRoute("/loc/scheme", self, suffix="LocScheme") router.addRoute("/ksn/{aid}", self, suffix="KeyStateNotice") + router.addRoute("/watcher/{aid}/{action}", self, suffix="AddWatched") def processReplyEndRole(self, *, serder, saider, route, cigars=None, tsgs=None, **kwargs): """ @@ -4406,7 +4400,7 @@ def processReplyKeyStateNotice(self, *, serder, saider, route, ksaider = coring.Saider(qb64=diger.qb64) self.updateKeyState(aid=aid, ksr=ksr, saider=ksaider, dater=dater) - self.cues.push(dict(kin="keyStateSaved", ksn=ksr._asdict())) + self.cues.push(dict(kin="keyStateSaved", ksn=asdict(ksr))) def updateEnd(self, keys, saider, allowed=None): """ @@ -4470,6 +4464,100 @@ def removeKeyState(self, saider): self.db.ksns.rem(keys=keys) self.db.kdts.rem(keys=keys) + def processReplyAddWatched(self, *, serder, saider, route, + cigars=None, tsgs=None, **kwargs): + """ Process one reply message for adding an AID for a watcher to watch + + Process one reply message for adding an AID for a watcher to watch = /watcher/{aid}/add + with either attached nontrans receipt couples in cigars or attached trans + indexed sig groups in tsgs. + Assumes already validated saider, dater, and route from serder.ked + + Parameters: + serder (SerderKERI): instance of reply msg (SAD) + saider (Saider): instance from said in serder (SAD) + route (str): reply route + cigars (list): of Cigar instances that contain nontrans signing couple + signature in .raw and public key in .verfer + tsgs (list): tuples (quadruples) of form + (prefixer, seqner, diger, [sigers]) where: + prefixer is pre of trans endorser + seqner is sequence number of trans endorser's est evt for keys for sigs + diger is digest of trans endorser's est evt for keys for sigs + [sigers] is list of indexed sigs from trans endorser's keys from est evt + + Reply Message: + { + "v" : "KERI10JSON00011c_", + "t" : "rpy", + "d": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + "dt": "2020-08-22T17:50:12.988921+00:00", + "r" : "/watcher/BrHLayDN-mXKv62DAjFLX1_Y5yEUe0vA9YPe_ihiKYHE/add", + "a" : + { + "cid": "EyX-zd8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM" + "oid": "EM0-i05TNZJZAoH3UR2nmLaU6JwyvPzhzS6YAfSVbMC5" + "oobi": "http://example.com/oobi/EyX-zd8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM" + } + } + + """ + aid = kwargs["aid"] + action = kwargs["action"] + # reply specific logic + if not route.startswith("/watcher"): + raise ValidationError(f"Usupported route={route} in {Ilks.rpy} " + f"msg={serder.ked}.") + + # reply specific logic + if action == "add": + enabled = True + elif action == "cut": + enabled = False + else: # unsupported route + raise ValidationError(f"Usupported route={route} in {Ilks.rpy} " + f"msg={serder.ked}.") + route = f"/watcher/{aid}" # escrow based on route base + cigars = cigars if cigars is not None else [] + tsgs = tsgs if tsgs is not None else [] + + data = serder.ked["a"] + cid = data["cid"] + oid = data["oid"] + oobi = data["oobi"] if "oobi" in data else None + + keys = (cid, aid, oid) + + osaider = self.db.wwas.get(keys=keys) # get old said if any + + # BADA Logic + accepted = self.rvy.acceptReply(serder=serder, saider=saider, route=route, + aid=cid, osaider=osaider, cigars=cigars, + tsgs=tsgs) + if not accepted: + raise UnverifiedReplyError(f"Unverified watcher add reply. {serder.ked}") + + if oobi: + self.db.oobis.pin(keys=(oobi,), val=OobiRecord(date=help.nowIso8601())) + self.updateWatched(keys=keys, saider=saider, enabled=enabled) + + def updateWatched(self, keys, saider, enabled=None): + """ + Update loc auth database .lans and loc database .locs. + + Parameters: + keys (tuple): of key strs for databases (eid, scheme) + saider (Saider): instance from said in reply serder (SAD) + enabled (bool): True means add observed to watcher, False means remove (cut) + """ + self.db.wwas.pin(keys=keys, val=saider) # overwrite + if observed := self.db.obvs.get(keys=keys): # preexisting record + observed.enabled = enabled # update preexisting record + else: # no preexisting record + observed = basing.ObservedRecord(enabled=enabled, datetime=helping.nowIso8601()) # create new record + + self.db.obvs.pin(keys=keys, val=observed) # overwrite + def processQuery(self, serder, source=None, sigers=None, cigars=None): """ Process query mode replay message for collective or single element query. @@ -5175,6 +5263,14 @@ def processEscrowPartialSigs(self): raise ValidationError("Missing escrowed evt sigs at " "dig = {}.".format(bytes(edig))) + wigs = self.db.getWigs(dgKey(pre, bytes(edig))) # list of wigs + if not wigs: # empty list + # wigs maybe empty while waiting for first witness signature + # which may not arrive until some time after event is fully signed + # so just log for debugging but do not unescrow by raising + # ValidationError + logger.debug("Kevery unescrow wigs: No event wigs yet at." + "dig = %s", bytes(edig)) # seal source (delegator issuer if any) delseqner = delsaider = None @@ -5197,7 +5293,8 @@ def processEscrowPartialSigs(self): # process event sigers = [Siger(qb64b=bytes(sig)) for sig in sigs] - self.processEvent(serder=eserder, sigers=sigers, + wigers = [Siger(qb64b=bytes(wig)) for wig in wigs] + self.processEvent(serder=eserder, sigers=sigers, wigers=wigers, delseqner=delseqner, delsaider=delsaider, local=esr.local) # If process does NOT validate sigs or delegation seal (when delegated), diff --git a/src/keri/core/routing.py b/src/keri/core/routing.py index 8d119a27b..3601529f0 100644 --- a/src/keri/core/routing.py +++ b/src/keri/core/routing.py @@ -198,7 +198,6 @@ def processReply(self, serder, cigars=None, tsgs=None): self.rtr.dispatch(serder=serder, saider=saider, cigars=cigars, tsgs=tsgs) - def acceptReply(self, serder, saider, route, aid, osaider=None, cigars=None, tsgs=None): """ Applies Best Available Data Acceptance policy to reply and signatures @@ -494,8 +493,6 @@ def processEscrowReply(self): # still waiting on missing prior event to validate if logger.isEnabledFor(logging.DEBUG): logger.exception("Kevery unescrow attempt failed: %s", ex.args[0]) - else: - logger.error("Kevery unescrow attempt failed: %s", ex.args[0]) except Exception as ex: # other error so remove from reply escrow self.db.rpes.rem(keys=(route, ), val=saider) # remove escrow only diff --git a/src/keri/db/basing.py b/src/keri/db/basing.py index cf7fbd30d..601e77d35 100644 --- a/src/keri/db/basing.py +++ b/src/keri/db/basing.py @@ -347,7 +347,6 @@ class OobiRecord: urls: list = None - @dataclass class EndpointRecord: # baser.ends """ @@ -499,6 +498,56 @@ def __iter__(self): return iter(asdict(self)) +@dataclass +class ObservedRecord: # baser.obvs + """ + Watched Record with fields and keys to manage OIDs (Observed IDs) being watched by a watcher, keyed by + cid (controller ID), aid (watcher ID), and oid (observed ID). + + The namespace is a tree of branches with each leaf at a + specific (cid, aid, oid). Retrieval by branch returns groups of leaves as + appropriate for a cid braanch or cid.aid branch. + Database Keys are (cid, aid, oid) where cid is attributable controller identifier + (qb64 prefix). + + Attributes: + enabled (bool): AuthZ via expose message + True means oid is enabled as being observed + False means eid is disenabled being observed + None means eid is neither enabled or disenabled + name (str): user friendly name for eid in role + datetime (str): Date time this record was last observed + + + A watcher end reply message is required from which the field values + for this record are extracted. A routes of /watcher/{aid}/add /watcher/{aid}/cut + Uses add-cut model with allowed field + enabled==True oid is allowed (add) as being observed + enabled==False oid is disallowed (cut) as being observed + + { + "v" : "KERI10JSON00011c_", + "t" : "rpy", + "d": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + "dt": "2020-08-22T17:50:12.988921+00:00", + "r" : "/watcher/BrHLayDN-mXKv62DAjFLX1_Y5yEUe0vA9YPe_ihiKYHE/add", + "a" : + { + "cid": "EaU6JR2nmwyZ-i0d8JZAoTNZH3ULvYAfSVPzhzS6b5CM", + "oid": "EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + "oobi": "http://example.com/oobi/EyX-zd8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM", + } + } + + """ + enabled: bool = None # True eid enabled (add), False eid disenabled (cut), None neither + name: str = "" # optional user friendly name of endpoint + datetime: str = None + + def __iter__(self): + return iter(asdict(self)) + + @dataclass class WellKnownAuthN: """ @@ -1015,6 +1064,11 @@ def reopen(self, **kwa): self.locs = koming.Komer(db=self, subkey='locs.', schema=LocationRecord, ) + # observed oids by watcher by cid.aid.oid (endpoint identifier) + # data extracted from reply loc + self.obvs = koming.Komer(db=self, + subkey='obvs.', + schema=ObservedRecord, ) # index of last retrieved message from witness mailbox # TODO: clean @@ -1095,6 +1149,11 @@ def reopen(self, **kwa): # TODO: clean self.knas = subing.CesrSuber(db=self, subkey='knas.', klas=coring.Saider) + # Watcher watched SAID database for successfully saved watched AIDs for a watcher + # maps key=(cid, aid, oid) to val=said of rpy message + # TODO: clean + self.wwas = subing.CesrSuber(db=self, subkey='wwas.', klas=coring.Saider) + # config loaded oobis to be processed asynchronously, keyed by oobi URL # TODO: clean self.oobis = koming.Komer(db=self, @@ -1182,6 +1241,9 @@ def reopen(self, **kwa): # delegated unanchored escrow self.dune = subing.SerderSuber(db=self, subkey='dune.') + # delegate publication escrow for sending delegator info to my witnesses + self.dpub = subing.SerderSuber(db=self, subkey='dpub.') + # completed group delegated AIDs # TODO: clean self.cdel = subing.CesrSuber(db=self, subkey='cdel.', @@ -1540,7 +1602,6 @@ def cloneEvtMsg(self, pre, fn, dig): msg.extend(atc) return msg - def cloneDelegation(self, kever): """ Recursively clone delegation chain from AID of Kever if one exits. @@ -1556,7 +1617,6 @@ def cloneDelegation(self, kever): for dmsg in self.clonePreIter(pre=kever.delpre, fn=0): yield dmsg - def findAnchoringSealEvent(self, pre, seal, sn=0): """ Search through a KEL for the event that contains a specific anchored diff --git a/tests/app/test_storing.py b/tests/app/test_storing.py index 82bf39352..d37eeae3c 100644 --- a/tests/app/test_storing.py +++ b/tests/app/test_storing.py @@ -1,6 +1,6 @@ # -*- encoding: utf-8 -*- """ -tests.peer.mailboxing +tests.app.storing """ import os diff --git a/tests/app/test_watching.py b/tests/app/test_watching.py new file mode 100644 index 000000000..4c7a49702 --- /dev/null +++ b/tests/app/test_watching.py @@ -0,0 +1,159 @@ +# -*- encoding: utf-8 -*- +""" +tests.app.watching + +""" +from dataclasses import asdict + +import pytest + +from keri import core +from keri.app import watching, habbing +from keri.app.watching import DiffState +from keri.core import coring +from keri.db.basing import KeyStateRecord, ObservedRecord + + +def test_diffstate(): + d0 = {'vn': [1, 0], + 'i': 'EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM', + 's': '0', + 'p': 'ElsHFkbZQjRb7xHnuE-wyiarIZ9j-1CEQ89I0E3WevcE', + 'd': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'f': '0', + 'dt': '2021-06-09T17:35:54.169967+00:00', + 'et': '2021-06-09T17:35:54.169967+00:00', + 'kt': '1', + 'k': ["D-HwiqmaETxls3vAVSh0xpXYTs94NUJX6juupWj_EgsA"], + 'nt': '1', + 'n': ["ED6lKZwg-BWl_jlCrjosQkOEhqKD4BJnlqYqWmhqPhaU"], + 'bt': '0', + 'b': [], + 'c': [], + 'ee': { + 's': '0', + 'd': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'br': [], + 'ba': [] + }, + 'di': ''} + + ksr0 = KeyStateRecord(**d0) + d1 = {'vn': [1, 0], + 'i': 'EZ-i0d8JZAoTNZH3ULaU6JR2nmwyvYAfSVPzhzS6b5CM', + 's': '0', + 'p': 'ElsHFkbZQjRb7xHnuE-wyiarIZ9j-1CEQ89I0E3WevcE', + 'd': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc', + 'f': '0', + 'dt': '2021-06-09T17:35:54.169967+00:00', + 'et': '2021-06-09T17:35:54.169967+00:00', + 'kt': '1', + 'k': ["DxVTxls3vAwiqmaEXYTs94NUJX6juVSh0xpupEgsAWj_"], + 'nt': '1', + 'n': ["ED6lKZwg-BWl_jlCrjosQkOEhqKD4BJnlqYqWmhqPhaU"], + 'bt': '0', + 'b': [], + 'c': [], + 'ee': { + 's': '0', + 'd': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'br': [], + 'ba': [] + }, + 'di': ''} + ksr1 = KeyStateRecord(**d1) + + wat = "BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s" + diffstate = watching.diffState(wat, ksr0, ksr1) + + # Sequence numbers are the same, digest different == duplicitous + assert asdict(diffstate) == {'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + 'state': 'duplicitous', + 'sn': 0, 'dig': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc'} + + # Same state == event + diffstate = watching.diffState(wat, ksr0, ksr0) + assert asdict(diffstate) == {'dig': 'EBiIFxr_o1b4x1YR21PblAFpFG61qDghqFBDyVSOXYW0', + 'sn': 0, + 'state': 'even', + 'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'} + + ksr1.s = "2" + diffstate = watching.diffState(wat, ksr0, ksr1) + + # Sequence numbers are the same, digest different == duplicitous + assert asdict(diffstate) == {'dig': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc', + 'sn': 2, + 'state': 'ahead', + 'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'} + + ksr0.s = "3" + diffstate = watching.diffState(wat, ksr0, ksr1) + + # Sequence numbers are the same, digest different == duplicitous + assert asdict(diffstate) == {'dig': 'Ey2pXEnaoQVwxA4jB6k0QH5G2Us-0juFL5hOAHAwIEkc', + 'sn': 2, + 'state': 'behind', + 'wit': 'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'} + + +def test_adjudicator(): + default_salt = core.Salter(raw=b'0123456789abcdef').qb64 + with habbing.openHby(name="test", base="test", salt=default_salt) as hby: + hab = hby.makeHab("test") + assert hab.pre == "EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3" + wat = "BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s" + saider = coring.Saider(qb64b=b'EClqKVJREM3MWKBqR2j712s3Z6rPxhqO-h-p8Ls6_9hQ') + + ksr = hab.kever.state() + ksr0 = KeyStateRecord(**asdict(ksr)) + + hab.db.knas.pin(keys=(hab.pre, wat), val=saider) + hab.db.ksns.pin(keys=(saider.qb64, ), val=ksr0) + hab.db.obvs.pin(keys=(hab.pre, wat, hab.pre), val=ObservedRecord(enabled=True)) + + adj = watching.Adjudicator(hby=hby, hab=hab) + + adj.adjudicate(hab.pre, 1) + assert len(adj.cues) == 1 + cue = adj.cues.pull() + + assert cue == {'cid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'kin': 'keyStateConsistent', + 'oid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'states': [DiffState(wit='BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + state='even', + sn=0, + dig='EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3')], + 'wids': {'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'}} + + hab.rotate() + + adj.adjudicate(hab.pre, 1) + assert len(adj.cues) == 1 + cue = adj.cues.pull() + assert cue == {'behind': [DiffState(wit='BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + state='behind', + sn=0, + dig='EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3')], + 'cid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'kin': 'keyStateLagging', + 'oid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'wids': {'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'}} + + ksr0.s = '1' + hab.db.ksns.pin(keys=(saider.qb64, ), val=ksr0) + adj.adjudicate(hab.pre, 1) + assert len(adj.cues) == 1 + cue = adj.cues.pull() + assert cue == {'cid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'dups': [DiffState(wit='BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s', + state='duplicitous', + sn=1, + dig='EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3')], + 'kin': 'keyStateDuplicitous', + 'oid': 'EIaGMMWJFPmtXznY1IIiKDIrg-vIyge6mBl2QV8dDjI3', + 'wids': {'BbIg_3-11d3PYxSInLN-Q9_T2axD6kkXd3XRgbGZTm6s'}} + + with pytest.raises(ValueError): + adj.adjudicate(hab.pre, 2) diff --git a/tests/core/test_reply.py b/tests/core/test_reply.py index 4705065eb..383ab74ec 100644 --- a/tests/core/test_reply.py +++ b/tests/core/test_reply.py @@ -22,8 +22,7 @@ from keri.db import basing from keri.app import habbing, keeping - - +from keri.kering import Roles logger = help.ogler.getLogger() @@ -1283,5 +1282,165 @@ def test_reply(mockHelpingNowUTC): """Done Test""" +def test_watcher_add_cut(): + salt = core.Salter(raw=b'abcdef0123456789').qb64 + + with habbing.openHby(name="con", base="test", salt=salt) as conHby, \ + habbing.openHby(name="wat0", base="test", salt=salt) as wat0hby, \ + habbing.openHby(name="wat1", base="test", salt=salt) as wat1hby, \ + habbing.openHby(name="wat2", base="test", salt=salt) as wat2hby, \ + habbing.openHby(name="obv0", base="test", salt=salt) as obv0hby, \ + habbing.openHby(name="obv1", base="test", salt=salt) as obv1hby, \ + habbing.openHby(name="obv2", base="test", salt=salt) as obv2hby: + + conHab = conHby.makeHab(name="con", isith="1", icount=1, transferable=True) + assert conHab.kever.prefixer.transferable + conKvy = eventing.Kevery(db=conHab.db, lax=False, local=False) + + wat0hab = wat0hby.makeHab(name='wat0', isith="1", icount=1, transferable=False) + assert not wat0hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + wat0kvy = eventing.Kevery(db=wat0hab.db, lax=False, local=False) + + wat1hab = wat1hby.makeHab(name='wat1', isith="1", icount=1, transferable=False) + assert not wat1hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + wat1kvy = eventing.Kevery(db=wat1hab.db, lax=False, local=False) + + wat2hab = wat2hby.makeHab(name='wat2', isith="1", icount=1, transferable=False) + assert not wat2hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + wat2kvy = eventing.Kevery(db=wat2hab.db, lax=False, local=False) + + obv0hab = obv0hby.makeHab(name='obv0', isith="1", icount=1, transferable=True) + assert obv0hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + obv0kvy = eventing.Kevery(db=obv0hab.db, lax=False, local=False) + + obv1hab = obv1hby.makeHab(name='obv1', isith="1", icount=1, transferable=True) + assert obv1hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + obv1kvy = eventing.Kevery(db=obv1hab.db, lax=False, local=False) + + obv2hab = obv2hby.makeHab(name='obv2', isith="1", icount=1, transferable=True) + assert obv2hab.kever.prefixer.transferable + # create non-local kevery for Wes to process nonlocal msgs + obv2kvy = eventing.Kevery(db=obv2hab.db, lax=False, local=False) + + for hab in [wat0hab, wat1hab, wat2hab, obv0hab, obv1hab, obv2hab]: + msg = hab.makeOwnInception() + parsing.Parser().parseOne(ims=msg, kvy=conKvy) + + conIcp = conHab.makeOwnInception() + for kvy in [wat0kvy, wat1kvy, wat2kvy, obv0kvy, obv1kvy, obv2kvy]: + parsing.Parser().parseOne(ims=bytes(conIcp), kvy=kvy) # make copy so we don't clobber it + + assert wat0hab.pre in conHab.kevers + assert wat1hab.pre in conHab.kevers + assert wat2hab.pre in conHab.kevers + assert obv0hab.pre in conHab.kevers + assert obv1hab.pre in conHab.kevers + assert obv2hab.pre in conHab.kevers + + data = dict(cid=conHab.pre, role=Roles.watcher, eid=wat0hab.pre) + rpy = conHab.reply(route="/end/role/add", data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + ender = conHab.db.ends.get(keys=(conHab.pre, Roles.watcher, wat0hab.pre)) + assert ender.allowed is True + ender = wat0hab.db.ends.get(keys=(conHab.pre, Roles.watcher, wat0hab.pre)) + assert ender.allowed is True + + for hab in [obv0hab, obv1hab, obv2hab]: + icp = hab.makeOwnInception() + conHab.psr.parseOne(ims=bytes(icp)) + wat0hab.psr.parseOne(ims=bytes(icp)) + wat1hab.psr.parseOne(ims=bytes(icp)) + wat2hab.psr.parseOne(ims=bytes(icp)) + + assert obv0hab.pre in conHab.kevers + assert obv1hab.pre in conHab.kevers + assert obv2hab.pre in conHab.kevers + assert obv2hab.pre in wat0hab.kevers + + route = f"/watcher/{wat0hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv0hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv0hab.pre)) + assert observed.enabled is True + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv0hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat0hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv1hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is True + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat0hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv2hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv2hab.pre)) + assert observed.enabled is True + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv2hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat0hab.pre}/cut" + data = dict(cid=conHab.pre, oid=obv1hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat0hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is False + observed = wat0hab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is False + + route = f"/watcher/{wat1hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv0hab.pre) + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat1hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv0hab.pre)) + assert observed.enabled is True + observed = wat1hab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv0hab.pre)) + assert observed.enabled is True + + route = f"/watcher/{wat1hab.pre}/add" + data = dict(cid=conHab.pre, oid=obv2hab.pre, oobi=f"http://example.com/oobi/{obv2hab.pre}") + rpy = conHab.reply(route=route, data=data) + conHab.psr.parseOne(ims=bytes(rpy)) # make copy so we don't clobber it + wat1hab.psr.parseOne(ims=rpy) + + observed = conHab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv0hab.pre)) + assert observed.enabled is True + assert conHab.db.oobis.get(keys=(f"http://example.com/oobi/{obv2hab.pre}",)) is not None + observed = wat1hab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv2hab.pre)) + assert observed.enabled is True + observed = wat1hab.db.obvs.get(keys=(conHab.pre, wat1hab.pre, obv1hab.pre)) + assert observed is None + + # Make sure nothing was changed for Wat0 + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv0hab.pre)) + assert observed.enabled is True + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv1hab.pre)) + assert observed.enabled is False + observed = conHab.db.obvs.get(keys=(conHab.pre, wat0hab.pre, obv2hab.pre)) + assert observed.enabled is True + + if __name__ == "__main__": pytest.main(['-vv', 'test_reply.py::test_reply'])