Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: tel state noticer to query for tel updates #868

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions src/keri/app/querying.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

from hio.base import doing
from keri.app import agenting
from keri.vdr import viring


class QueryDoer(doing.DoDoer):
Expand Down Expand Up @@ -146,3 +147,120 @@ def recur(self, tyme, deeds=None):
return True

return super(AnchorQuerier, self).recur(tyme, deeds)


class TelStateNoticer(doing.DoDoer):
def __init__(self, hby, tvy, hab, pre, ri, i=None, **opts):
self.hby = hby
self.tvy = tvy
self.hab = hab
self.pre = pre
self.ri = ri
self.i = i
self.cues = tvy.cues
self.witq = agenting.WitnessInquisitor(hby=self.hby)
self.witq.telquery(hab=self.hab, pre=self.pre, r="tsn", ri=self.ri, i=self.i)

super(TelStateNoticer, self).__init__(doers=[self.witq], **opts)

def recur(self, tyme, deeds=None):
if self.cues:
cue = self.cues.pull()
match cue['kin']:
case "txnStateSaved":
record = cue['record']
behind = False

if isinstance(record, viring.RegStateRecord):
if record.i != self.ri:
self.cues.append(cue) # from a diff TelStateNoticer
return super(TelStateNoticer, self).recur(tyme, deeds)

if record.i in self.tvy.tevers:
if self.tvy.tevers[self.ri].sn < int(record.s, 16):
behind = True
else:
behind = True

if behind:
self.extend([RegistryLogQuerier(hby=self.hby, tvy=self.tvy, hab=self.hab, pre=self.pre, record=record)])
elif isinstance(record, viring.VcStateRecord):
if record.ri != self.ri or not self.i or record.i != self.i:
self.cues.append(cue) # from a diff TelStateNoticer
return super(TelStateNoticer, self).recur(tyme, deeds)

regsn = int(record.ra["s"], 16) if "s" in record.ra else 0
if record.ri in self.tvy.tevers and regsn <= self.tvy.tevers[record.ri].sn:
tever = self.tvy.tevers[record.ri]
vcSn = tever.vcSn(record.i)
if vcSn is None or vcSn < int(record.s, 16):
behind = True
else:
behind = True

if behind:
self.extend([VcLogQuerier(hby=self.hby, tvy=self.tvy, hab=self.hab, pre=self.pre, record=record)])

self.remove([self.witq])
return True
case _:
self.cues.append(cue)

return super(TelStateNoticer, self).recur(tyme, deeds)


class RegistryLogQuerier(doing.DoDoer):

def __init__(self, hby, tvy, hab, pre, record, **opts):
self.hby = hby
self.tvy = tvy
self.hab = hab
self.pre = pre
self.record = record
self.witq = agenting.WitnessInquisitor(hby=self.hby)
self.witq.telquery(hab=self.hab, pre=self.pre, ri=record.i)

super(RegistryLogQuerier, self).__init__(doers=[self.witq], **opts)

def recur(self, tyme, deeds=None):
"""
Returns: doifiable Doist compatible generator method
Usage:
add result of doify on this method to doers list
"""
if self.record.i in self.tvy.tevers:
tever = self.tvy.tevers[self.record.i]
if int(tever.state().s, 16) >= int(self.record.s, 16):
self.remove([self.witq])
return True

return super(RegistryLogQuerier, self).recur(tyme, deeds)


class VcLogQuerier(doing.DoDoer):

def __init__(self, hby, tvy, hab, pre, record, **opts):
self.hby = hby
self.tvy = tvy
self.hab = hab
self.pre = pre
self.record = record
self.witq = agenting.WitnessInquisitor(hby=self.hby)
self.witq.telquery(hab=self.hab, pre=self.pre, ri=record.ri, i=record.i)

super(VcLogQuerier, self).__init__(doers=[self.witq], **opts)

def recur(self, tyme, deeds=None):
"""
Returns: doifiable Doist compatible generator method
Usage:
add result of doify on this method to doers list
"""
if self.record.ri in self.tvy.tevers:
tever = self.tvy.tevers[self.record.ri]
vcSn = tever.vcSn(self.record.i)
if vcSn is not None and vcSn >= int(self.record.s, 16):
self.remove([self.witq])
return True

return super(VcLogQuerier, self).recur(tyme, deeds)
15 changes: 0 additions & 15 deletions src/keri/kering.py
Original file line number Diff line number Diff line change
Expand Up @@ -742,21 +742,6 @@ class UnverifiedProofError(ValidationError):
"""


class OutOfOrderKeyStateError(ValidationError):
"""
Error referenced event missing from log so can't verify this key state event
Usage:
raise OutOfOrderKeyStateError("error message")
"""


class OutOfOrderTxnStateError(ValidationError):
"""
Error referenced event missing from log so can't verify this txn state event
Usage:
raise OutOfOrderTxnStateError("error message")
"""

class MisfitEventSourceError(ValidationError):
"""
Error referenced event missing from log so can't verify this txn state event
Expand Down
67 changes: 25 additions & 42 deletions src/keri/vdr/eventing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1644,14 +1644,15 @@ def processQuery(self, serder, source=None, sigers=None, cigars=None):
self.cues.append(dict(kin="replay", src=src, dest=source.qb64, msgs=msgs))
elif route == "tsn":
ri = qry["ri"]
src = qry["src"]
if ri in self.tevers:
tever = self.tevers[ri]
tsn = tever.state()
self.cues.push(dict(kin="reply", route="/tsn/registry", data=asdict(tsn), dest=source))
self.cues.push(dict(kin="reply", route=f"/tsn/registry/{src}", data=asdict(tsn), dest=source.qb64))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious if we should just skip this if vci is set below, depends on if we need to escrow or not


if vcpre := qry["i"]:
tsn = tever.vcState(vcpre=vcpre)
self.cues.push(dict(kin="reply", route="/tsn/credential", data=asdict(tsn), dest=source))
tsn = tever.vcState(vci=vcpre)
self.cues.push(dict(kin="reply", route=f"/tsn/credential/{src}", data=asdict(tsn), dest=source.qb64))

else:
raise ValidationError("invalid query message {} for evt = {}".format(ilk, ked))
Expand Down Expand Up @@ -1784,24 +1785,17 @@ def processReplyRegistryTxnState(self, *, serder, saider, route, cigars=None, ts

ldig = self.reger.getTel(key=snKey(pre=regk, sn=sn)) # retrieve dig of last event at sn.

# Only accept key state if for last seen version of event at sn
if ldig is None: # escrow because event does not yet exist in database
if self.reger.txnsb.escrowStateNotice(typ="registry-ooo", pre=regk, aid=aid, serder=serder, saider=saider,
dater=dater, cigars=cigars, tsgs=tsgs):
self.cues.append(dict(kin="telquery", q=dict(ri=regk)))

raise kering.OutOfOrderTxnStateError("Out of order txn state={}.".format(rsr))

tsaider = coring.Saider(qb64=rsr.d)
ldig = bytes(ldig)
# retrieve last event itself of signer given sdig
sraw = self.reger.getTvt(key=dgKey(pre=regk, dig=ldig))
# assumes db ensures that sraw must not be none because sdig was in KE
sserder = serdering.SerderKERI(raw=bytes(sraw))
if ldig is not None: # escrow because event does not yet exist in database
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aligning with ksn

ldig = bytes(ldig)
# retrieve last event itself of signer given sdig
sraw = self.reger.getTvt(key=dgKey(pre=regk, dig=ldig))
# assumes db ensures that sraw must not be none because sdig was in KE
sserder = serdering.SerderKERI(raw=bytes(sraw))

if sserder.said != tsaider.qb64: # mismatch events problem with replay
raise ValidationError("Mismatch keystate at sn = {} with db."
"".format(rsr.s))
if sserder.said != tsaider.qb64: # mismatch events problem with replay
raise ValidationError("Mismatch keystate at sn = {} with db."
"".format(rsr.s))

self.reger.txnsb.updateReply(aid=aid, serder=serder, saider=tsaider, dater=dater)
self.cues.append(dict(kin="txnStateSaved", record=rsr))
Expand Down Expand Up @@ -1924,28 +1918,21 @@ def processReplyCredentialTxnState(self, *, serder, saider, route, cigars=None,

ldig = self.reger.getTel(key=snKey(pre=vci, sn=sn)) # retrieve dig of last event at sn.

# Only accept key state if for last seen version of event at sn
if ldig is None: # escrow because event does not yet exist in database
if self.reger.txnsb.escrowStateNotice(typ="credential-ooo", pre=vci, aid=aid, serder=serder,
saider=saider, dater=dater, cigars=cigars, tsgs=tsgs):
self.cues.append(dict(kin="telquery", q=dict(ri=regk, i=vci)))

raise kering.OutOfOrderTxnStateError("Out of order txn state={}.".format(vsr))

tsaider = coring.Saider(qb64=vsr.d)
ldig = bytes(ldig)
# retrieve last event itself of signer given sdig
sraw = self.reger.getTvt(key=dgKey(pre=vci, dig=ldig))
# assumes db ensures that sraw must not be none because sdig was in KE
sserder = serdering.SerderKERI(raw=bytes(sraw))
if ldig is not None:
ldig = bytes(ldig)
# retrieve last event itself of signer given sdig
sraw = self.reger.getTvt(key=dgKey(pre=vci, dig=ldig))
# assumes db ensures that sraw must not be none because sdig was in KE
sserder = serdering.SerderKERI(raw=bytes(sraw))

if sn < sserder.sn:
raise ValidationError("Stale txn state at sn = {} with db."
"".format(vsr.s))
if sn < sserder.sn:
raise ValidationError("Stale txn state at sn = {} with db."
"".format(vsr.s))

if sserder.said != tsaider.qb64: # mismatch events problem with replay
raise ValidationError("Mismatch txn state at sn = {} with db."
"".format(vsr.s))
if sserder.said != tsaider.qb64: # mismatch events problem with replay
raise ValidationError("Mismatch txn state at sn = {} with db."
"".format(vsr.s))

self.reger.txnsb.updateReply(aid=aid, serder=serder, saider=tsaider, dater=dater)
self.cues.append(dict(kin="txnStateSaved", record=vsr))
Expand Down Expand Up @@ -2007,12 +1994,8 @@ def processEscrows(self):
extype=kering.MissingRegistryError)
self.reger.txnsb.processEscrowState(typ="credential-mae", processReply=self.processReplyCredentialTxnState,
extype=kering.MissingAnchorError)
self.reger.txnsb.processEscrowState(typ="credential-ooo", processReply=self.processReplyCredentialTxnState,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this change is right, maybe we should also remove the credential-mre escrow, not sure.

extype=kering.OutOfOrderTxnStateError)
self.reger.txnsb.processEscrowState(typ="registry-mae", processReply=self.processReplyRegistryTxnState,
extype=kering.MissingAnchorError)
self.reger.txnsb.processEscrowState(typ="registry-ooo", processReply=self.processReplyRegistryTxnState,
extype=kering.OutOfOrderTxnStateError)

except Exception as ex: # log diagnostics errors etc
if logger.isEnabledFor(logging.DEBUG):
Expand Down
Loading
Loading