diff --git a/src/keri/app/cli/commands/decrypt.py b/src/keri/app/cli/commands/decrypt.py index de460ded..063242c4 100644 --- a/src/keri/app/cli/commands/decrypt.py +++ b/src/keri/app/cli/commands/decrypt.py @@ -55,8 +55,8 @@ def decrypt(tymth, tock=0.0, **opts): else: data = data - m = coring.Matter(qb64=data) - d = coring.Matter(qb64=hab.decrypt(m.raw)) + m = coring.Matter(qb64=data) # should refactor this to use Cipher + d = coring.Matter(qb64=hab.decrypt(ser=m.raw)) print(d.raw) except kering.ConfigurationError: diff --git a/src/keri/app/cli/commands/witness/authenticate.py b/src/keri/app/cli/commands/witness/authenticate.py index 1758093c..f0a2b419 100644 --- a/src/keri/app/cli/commands/witness/authenticate.py +++ b/src/keri/app/cli/commands/witness/authenticate.py @@ -127,8 +127,8 @@ def authDo(self, tymth, tock=0.0): data = json.loads(rep.body) totp = data["totp"] - m = coring.Matter(qb64=totp) - d = coring.Matter(qb64=self.hab.decrypt(m.raw)) + m = coring.Matter(qb64=totp) # refactor this to use cipher + d = coring.Matter(qb64=self.hab.decrypt(ser=m.raw)) otpurl = f"otpauth://totp/KERIpy:{self.witness}?secret={d.raw.decode('utf-8')}&issuer=KERIpy" if not self.urlOnly: diff --git a/src/keri/app/habbing.py b/src/keri/app/habbing.py index 44516caa..d9431aa4 100644 --- a/src/keri/app/habbing.py +++ b/src/keri/app/habbing.py @@ -1320,14 +1320,16 @@ def sign(self, ser, verfers=None, indexed=True, indices=None, ondices=None, **kw indices=indices, ondices=ondices) + def decrypt(self, ser, verfers=None, **kwa): - """Sign given serialization ser using appropriate keys. + """Decrypt given serialization ser using appropriate keys. Use provided verfers or .kever.verfers to lookup keys to sign. Parameters: - ser (bytes): serialization to sign + ser (str | bytes | bytearray | memoryview): serialization to decrypt + verfers (list[Verfer] | None): Verfer instances to get pub verifier - keys to lookup private siging keys. + keys to lookup and convert to private decryption keys. verfers None means use .kever.verfers. Assumes that when group and verfers is not None then provided verfers must be .kever.verfers @@ -1335,9 +1337,10 @@ def decrypt(self, ser, verfers=None, **kwa): if verfers is None: verfers = self.kever.verfers # when group these provide group signing keys - return self.mgr.decrypt(ser=ser, - verfers=verfers, - ) + # should not use mgr.decrypt since it assumes qb64. Just lucky its not + # yet a problem + return self.mgr.decrypt(qb64=ser, verfers=verfers) + def query(self, pre, src, query=None, **kwa): """ Create, sign and return a `qry` message against the attester for the prefix diff --git a/src/keri/app/keeping.py b/src/keri/app/keeping.py index 3a626a62..57ac560b 100644 --- a/src/keri/app/keeping.py +++ b/src/keri/app/keeping.py @@ -804,8 +804,8 @@ def updateAeid(self, aeid, seed): # re-encrypt root salt secrets by prefix parameters .prms for keys, data in self.ks.prms.getItemIter(): # keys is tuple of pre qb64 if data.salt: - salter = self.decrypter.decrypt(ser=data.salt) - data.salt = (self.encrypter.encrypt(matter=salter).qb64 + salter = self.decrypter.decrypt(qb64=data.salt) + data.salt = (self.encrypter.encrypt(prim=salter).qb64 if self.encrypter else salter.qb64) self.ks.prms.pin(keys, val=data) @@ -889,7 +889,7 @@ def salt(self): """ salt = self.ks.gbls.get('salt') if self.decrypter: # given .decrypt secret salt must be encrypted in db - return self.decrypter.decrypt(ser=salt).qb64 + return self.decrypter.decrypt(qb64=salt).qb64 return salt @@ -902,7 +902,7 @@ def salt(self, salt): may be plain text or cipher text handled by updateAeid """ if self.encrypter: - salt = self.encrypter.encrypt(ser=salt).qb64 + salt = self.encrypter.encrypt(ser=salt, code=core.MtrDex.X25519_Cipher_Salt).qb64 self.ks.gbls.pin('salt', salt) @@ -1020,7 +1020,8 @@ def incept(self, icodes=None, icount=1, icode=coring.MtrDex.Ed25519_Seed, if creator.salt: pp.salt = (creator.salt if not self.encrypter - else self.encrypter.encrypt(ser=creator.salt).qb64) + else self.encrypter.encrypt(ser=creator.salt, + code=core.MtrDex.X25519_Cipher_Salt).qb64) dt = helping.nowIso8601() ps = PreSit( @@ -1184,7 +1185,7 @@ def rotate(self, pre, ncodes=None, ncount=1, if self.aeid: if not self.decrypter: raise kering.DecryptError("Unauthorized decryption. Aeid but no decrypter.") - salt = self.decrypter.decrypt(ser=salt).qb64 + salt = self.decrypter.decrypt(qb64=salt).qb64 else: salt = core.Salter(qb64=salt).qb64 # ensures salt was unencrypted @@ -1394,21 +1395,23 @@ def sign(self, ser, pubs=None, verfers=None, indexed=True, cigars.append(signer.sign(ser)) # assigns .verfer to cigar return cigars - def decrypt(self, ser, pubs=None, verfers=None): + + def decrypt(self, qb64, pubs=None, verfers=None): """ - Returns list of signatures of ser if indexed as Sigers else as Cigars with - .verfer assigned. + Returns decrypted plaintext of encrypted qb64 ciphertext serialization. Parameters: - ser (bytes): serialization to sign + qb64 (str | bytes | bytearray | memoryview): fully qualified base64 + ciphertext serialization to decrypt pubs (list[str] | None): of qb64 public keys to lookup private keys one of pubs or verfers is required. If both then verfers is ignored. verfers (list[Verfer] | None): Verfer instances of public keys one of pubs or verfers is required. If both then verfers is ignored. - If not pubs then gets public key from verfer.qb64 + If not pubs then gets public key from verfer.qb64 used to lookup + private keys Returns: - bytes: decrypted data + plain (bytes): decrypted plaintext """ signers = [] @@ -1433,18 +1436,22 @@ def decrypt(self, ser, pubs=None, verfers=None): raise ValueError("Missing prikey in db for pubkey={}".format(verfer.qb64)) signers.append(signer) - plain = ser + if hasattr(qb64, "encode"): + qb64 = qb64.encode() # convert str to bytes + qb64 = bytes(qb64) # convert bytearray or memoryview to bytes + for signer in signers: sigkey = signer.raw + signer.verfer.raw # sigkey is raw seed + raw verkey prikey = pysodium.crypto_sign_sk_to_box_sk(sigkey) # raw private encrypt key pubkey = pysodium.crypto_scalarmult_curve25519_base(prikey) - plain = pysodium.crypto_box_seal_open(plain, pubkey, prikey) # qb64b + plain = pysodium.crypto_box_seal_open(qb64, pubkey, prikey) # qb64b - if plain == ser: - raise ValueError("unable to decrypt data") + if plain == qb64: + raise ValueError(f"Unable to decrypt.") return plain + def ingest(self, secrecies, iridx=0, ncount=1, ncode=coring.MtrDex.Ed25519_Seed, dcode=coring.MtrDex.Blake3_256, algo=Algos.salty, salt=None, stem=None, tier=None, @@ -1542,7 +1549,8 @@ def ingest(self, secrecies, iridx=0, ncount=1, ncode=coring.MtrDex.Ed25519_Seed, pp = PrePrm(pidx=pidx, algo=algo, salt=(creator.salt if not self.encrypter - else self.encrypter.encrypt(ser=creator.salt).qb64), + else self.encrypter.encrypt(ser=creator.salt, + code=core.MtrDex.X25519_Cipher_Salt).qb64), stem=creator.stem, tier=creator.tier) pre = csigners[0].verfer.qb64b diff --git a/src/keri/core/__init__.py b/src/keri/core/__init__.py index ec83d8bb..b37b9516 100644 --- a/src/keri/core/__init__.py +++ b/src/keri/core/__init__.py @@ -16,3 +16,4 @@ from .indexing import Indexer, Siger, IdrDex, IdxSigDex from .signing import Signer, Salter, Cipher, CiXDex, Encrypter, Decrypter from .counting import Counter, Codens, CtrDex_2_0 +from .streaming import Streamer diff --git a/src/keri/core/coring.py b/src/keri/core/coring.py index 5896ffe0..6d933b81 100644 --- a/src/keri/core/coring.py +++ b/src/keri/core/coring.py @@ -350,12 +350,12 @@ class MatterCodex: Bytes_Big_L0: str = '7AAB' # Byte String big lead size 0 Bytes_Big_L1: str = '8AAB' # Byte String big lead size 1 Bytes_Big_L2: str = '9AAB' # Byte String big lead size 2 - X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 0 - X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 1 - X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 2 - X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 0 - X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 1 - X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 2 + X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 0 + X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 1 + X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 2 + X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 0 + X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 1 + X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 2 X25519_Cipher_QB64_L0: str = '4D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 0 X25519_Cipher_QB64_L1: str = '5D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 1 X25519_Cipher_QB64_L2: str = '6D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 2 @@ -795,9 +795,13 @@ def __init__(self, raw=None, code=MtrDex.Ed25519N, soft='', rize=None, rize (int | None): raw size in bytes when variable sized material not including lead bytes if any Otherwise None - qb64b (bytes | None): fully qualified crypto material Base64 - qb64 (str | bytes | None): fully qualified crypto material Base64 - qb2 (bytes | None): fully qualified crypto material Base2 + qb64b (str | bytes | bytearray | memoryview | None): fully qualified + crypto material Base64. When str, encodes as utf-8. Strips when + bytearray and strip is True. + qb64 (str | bytes | bytearray | memoryview | None): fully qualified + crypto material Base64. When str, encodes as utf-8. Ignores strip + qb2 (bytes | bytearray | memoryview | None): fully qualified crypto + material Base2. Strips when bytearray and strip is True. strip (bool): True means strip (delete) matter from input stream bytearray after parsing qb64b or qb2. False means do not strip diff --git a/src/keri/core/signing.py b/src/keri/core/signing.py index dcfaec05..7a4faa63 100644 --- a/src/keri/core/signing.py +++ b/src/keri/core/signing.py @@ -12,13 +12,15 @@ from cryptography.hazmat.primitives.serialization import Encoding, PublicFormat from cryptography.hazmat.primitives.asymmetric import ec, utils -from ..kering import (EmptyMaterialError, InvalidCodeError, InvalidSizeError) +from ..kering import (EmptyMaterialError, InvalidCodeError, InvalidSizeError, + InvalidValueError) from ..help import helping from .coring import (Tiers, ) from .coring import (SmallVrzDex, LargeVrzDex, Matter, MtrDex, Verfer, Cigar) -from .indexing import IdrDex, Siger +from .indexing import IdrDex, Indexer, Siger +from .streaming import Streamer DSS_SIG_MODE = "fips-186-3" @@ -483,24 +485,24 @@ def signers(self, count=1, start=0, path="", **kwa): # Codes for for ciphers of variable sized sniffable QB2 or QB64 plain text @dataclass(frozen=True) -class CipherX25519VarSnifCodex: +class CipherX25519VarStrmCodex: """ CipherX25519VarCodex is codex all variable sized cipher bytes derivation codes - for sealed box encryped ciphertext. Plaintext is Sniffable QB2 or QB64. + for sealed box encryped ciphertext. Plaintext is Sniffable CESR Stream. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ - X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 0 - X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 1 - X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 2 - X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 0 - X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 1 - X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 2 + X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 0 + X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 1 + X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 2 + X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 0 + X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 1 + X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 2 def __iter__(self): return iter(astuple(self)) -CiXVarSnifDex = CipherX25519VarSnifCodex() # Make instance +CiXVarStrmDex = CipherX25519VarStrmCodex() # Make instance # Codes for for ciphers of variable sized QB64 plain text @@ -592,16 +594,16 @@ def __iter__(self): class CipherX25519AllVarCodex: """ CipherX25519AllVarCodex is codex all variable size codes of cipher bytes - for sealed box encryped ciphertext. Plaintext maybe sniffable or qb64 or qb2. + for sealed box encryped ciphertext. Plaintext maybe sniffable CESR stream or qb64 or qb2. Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ - X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 0 - X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 1 - X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 2 - X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 0 - X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 1 - X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 2 + X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 0 + X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 1 + X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 2 + X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 0 + X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 1 + X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 2 X25519_Cipher_QB64_L0: str = '4D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 0 X25519_Cipher_QB64_L1: str = '5D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 1 X25519_Cipher_QB64_L2: str = '6D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 2 @@ -630,12 +632,12 @@ class CipherX25519AllCodex: Only provide defined codes. Undefined are left out so that inclusion(exclusion) via 'in' operator works. """ - X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 0 - X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 1 - X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable plaintext lead size 2 - X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 0 - X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 1 - X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable plaintext big lead size 2 + X25519_Cipher_L0: str = '4C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 0 + X25519_Cipher_L1: str = '5C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 1 + X25519_Cipher_L2: str = '6C' # X25519 sealed box cipher bytes of sniffable stream plaintext lead size 2 + X25519_Cipher_Big_L0: str = '7AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 0 + X25519_Cipher_Big_L1: str = '8AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 1 + X25519_Cipher_Big_L2: str = '9AAC' # X25519 sealed box cipher bytes of sniffable stream plaintext big lead size 2 X25519_Cipher_Seed: str = 'P' # X25519 sealed box 124 char qb64 Cipher of 44 char qb64 Seed X25519_Cipher_Salt: str = '1AAH' # X25519 sealed box 100 char qb64 Cipher of 24 char qb64 Salt X25519_Cipher_QB64_L0: str = '4D' # X25519 sealed box cipher bytes of QB64 plaintext lead size 0 @@ -681,6 +683,9 @@ class Cipher(Matter): def __init__(self, raw=None, code=None, **kwa): """ + Inherited Parameters: + (see Matter) + Parmeters: raw (bytes | str): cipher text (not plain text) code (str): cipher suite @@ -690,13 +695,13 @@ def __init__(self, raw=None, code=None, **kwa): # code given by raw size. Otherwise provided code fixed or variable size # is handled by Matter superclass. if raw is not None and code is None: - if len(raw) == Matter._rawSize(MtrDex.X25519_Cipher_Salt): - code = MtrDex.X25519_Cipher_Salt - elif len(raw) == Matter._rawSize(MtrDex.X25519_Cipher_Seed): - code = MtrDex.X25519_Cipher_Seed - else: - raise InvalidSizeError(f"Unsupported fixed raw size" - f" {len(raw)} for {code=}.") + if len(raw) == Matter._rawSize(MtrDex.X25519_Cipher_Salt): + code = MtrDex.X25519_Cipher_Salt + elif len(raw) == Matter._rawSize(MtrDex.X25519_Cipher_Seed): + code = MtrDex.X25519_Cipher_Seed + else: + raise InvalidSizeError(f"Unsupported fixed raw size" + f" {len(raw)} for {code=}.") if hasattr(raw, "encode"): raw = raw.encode("utf-8") # ensure bytes not str @@ -707,24 +712,48 @@ def __init__(self, raw=None, code=None, **kwa): raise InvalidCodeError(f"Unsupported cipher code = {self.code}.") - def decrypt(self, prikey=None, seed=None): + def decrypt(self, prikey=None, seed=None, klas=None, transferable=False, + bare=False, **kwa): """ - Returns plain text as Matter instance (Signer or Salter) of cryptographic - cipher text material given by .raw. Encrypted plain text is fully - qualified (qb64) so derivaton code of plain text preserved through + Returns plain text as klas instance (Matter, Indexer, Streamer). + When klas is None then klas default is based on .code. Maybe Salter, + Signer, or Streamer. Encrypted plain text is fully + qualified (qb64) via self so derivaton code of plain text preserved through encryption/decryption round trip. - Decrypter uses either decryption key given by prikey or derives prikey from - signing key derived from private seed. + The created Decrypter uses either decryption key given by prikey or + when prikey missing derives prikey from signing key derived from private + seed. + + Returns: + decrypted (Matter | Indexer | Streamer): instance of decrypted + cipher text of .raw which is encrypted qb64, qb2, or sniffable + stream depending on .code when bare is False. Otherwise returns + plaintext itself. + + Keyword Parameters: + (see Matter because created Decrypter is Matter subclass) Parameters: - prikey (Union[bytes, str]): qb64b or qb64 serialization of private - decryption key - seed (Union[bytes, str]): qb64b or qb64 serialization of private - signing key seed used to derive private decryption key + prikey (str | bytes): qb64 or qb64b serialization of private + decryption key. Must be fully qualified with code. + seed (str | bytes): qb64 or qb64b serialization of private + signing key seed used to derive private decryption key. Must be + fully qualified with code. + klas (Matter | Indexer | Streamer): Class used to create instance from + decrypted serialization. + transferable (bool): Modifier of klas instance creation. + When klas init (such as Signer) supports transferabe parm; + True means verfer of returned signer is transferable. + False means non-transferable + bare (bool): False (default) means returns instance holding plaintext + True means returns plaintext itself """ - decrypter = Decrypter(qb64b=prikey, seed=seed) - return decrypter.decrypt(ser=self.qb64b) + decrypter = Decrypter(qb64b=prikey, seed=seed, **kwa) + return decrypter.decrypt(cipher=self, + klas=klas, + transferable=transferable, + bare=bare) class Encrypter(Matter): @@ -766,8 +795,8 @@ def __init__(self, raw=None, code=MtrDex.X25519, verkey=None, **kwa): if not raw and verkey: verfer = Verfer(qb64b=verkey) if verfer.code not in (MtrDex.Ed25519N, MtrDex.Ed25519): - raise ValueError("Unsupported verkey derivation code = {}." - "".format(verfer.code)) + raise InvalidValueError(f"Unsupported verkey derivation code =" + f" {verfer.code}.") # convert signing public key to encryption public key raw = pysodium.crypto_sign_pk_to_box_pk(verfer.raw) @@ -776,7 +805,7 @@ def __init__(self, raw=None, code=MtrDex.X25519, verkey=None, **kwa): if self.code == MtrDex.X25519: self._encrypt = self._x25519 else: - raise ValueError("Unsupported encrypter code = {}.".format(self.code)) + raise InvalidValueError(f"Unsupported encrypter code = {self.code}.") def verifySeed(self, seed): """ @@ -794,33 +823,59 @@ def verifySeed(self, seed): pubkey = pysodium.crypto_sign_pk_to_box_pk(verkey) return (pubkey == self.raw) - def encrypt(self, ser=None, matter=None): + def encrypt(self, *, ser=None, prim=None, code=None): """ Returns: Cipher instance of cipher text encryption of plain text serialization - provided by either ser or Matter instance when provided. + provided by either ser or prim as CESR primitive instance. Parameters: - ser (Union[bytes,str]): qb64b or qb64 serialization of plain text - matter (Matter): plain text as Matter instance of seed or salt to - be encrypted + + ser (str | bytes | bytearray | memoryview): qb64b or qb64 or sniffable + stream serialization of plain text + prim (Matter | Indexer | Streamer): CESR primitive instance whose + serialization is qb64 or qb2 or sniffable stream and is to be + encrypted based on code + code (str): code of plain text type for resultant encrypted cipher """ - if not (ser or matter): - raise EmptyMaterialError("Neither ser or plain are provided.") + if not ser: - if ser: - matter = Matter(qb64b=ser) + if not prim: + raise EmptyMaterialError(f"Neither bar serialization or primitive " + f"are provided.") - if matter.code == MtrDex.Salt_128: # future other salt codes - code = MtrDex.X25519_Cipher_Salt - elif matter.code == MtrDex.Ed25519_Seed: # future other seed codes - code = MtrDex.X25519_Cipher_Seed - else: - raise ValueError("Unsupported plain text code = {}.".format(matter.code)) + if not code: + if prim.code == MtrDex.Salt_128: # future other salt codes + code = MtrDex.X25519_Cipher_Salt + elif prim.code == MtrDex.Ed25519_Seed: # future other seed codes + code = MtrDex.X25519_Cipher_Seed + else: + raise InvalidValueError(f"Unsupported primitive with code =" + f" {prim.code} when cipher code is " + f"missing.") + + if code in CiXAllQB64Dex: + ser = prim.qb64b + elif code in CiXVarQB2Dex: + ser = prim.qb2 + elif code in CiXVarStrmDex: + ser = prim.stream + else: + raise InvalidCodeError(f"Invalid primitive cipher {code=} not " + f"qb64 or qb2.") - # encrypting fully qualified qb64 version of plain text ensures its - # derivation code round trips through eventual decryption - return (self._encrypt(ser=matter.qb64b, pubkey=self.raw, code=code)) + if not code: # assumes default is sniffable stream + code = CiXDex.X25519_Cipher_L0 + + if hasattr(ser, "encode"): + ser = ser.encode() # convert str to bytes + if not isinstance(ser, bytes): + ser = bytes(ser) # convert bytearray and memoryview to bytes + + # encrypting cesr primitive qb64 or qb2 or cesr stream as plain + # text with proper cipher code ensures primitive round trip through eventual + # decryption. + return (self._encrypt(ser=ser, pubkey=self.raw, code=code)) @staticmethod def _x25519(ser, pubkey, code): @@ -830,7 +885,7 @@ def _x25519(ser, pubkey, code): ser (Union[bytes, str]): qb64b or qb64 serialization of seed or salt to be encrypted. pubkey (bytes): raw binary serialization of encryption public key - code (str): derivation code of serialized plain text seed or salt + code (str): cipher derivation code """ raw = pysodium.crypto_box_seal(ser, pubkey) return Cipher(raw=raw, code=code) @@ -870,13 +925,14 @@ def __init__(self, code=MtrDex.X25519_Private, seed=None, **kwa): """ Assign decrypting cipher suite function to ._decrypt - Parameters: See Matter for inheirted parameters - raw (bytes): private decryption key derived from seed (private signing key) - qb64b (bytes): fully qualified private decryption key - qb64 (str): fully qualified private decryption key + Inherited Parameters: + (see Matter) + + Parameters: See Matter for inherited parameters code (str): derivation code for private decryption key - seed (Union[bytes, str]): qb64b or qb64 of signing key seed used to - derive raw which is private decryption key + seed (str | bytes | bytearray | memoryview | None): qb64b or qb64 + of signing key seed used to derive raw which is private + decryption key """ try: super(Decrypter, self).__init__(code=code, **kwa) @@ -898,33 +954,62 @@ def __init__(self, code=MtrDex.X25519_Private, seed=None, **kwa): else: raise ValueError("Unsupported decrypter code = {}.".format(self.code)) - def decrypt(self, ser=None, cipher=None, transferable=False): - """ + + def decrypt(self, *, cipher=None, qb64=None, qb2=None, klas=None, + transferable=False, bare=False, **kwa): + """Returns plain text as klas instance (Matter, Indexer, Streamer). + When klas is None then klas default is based on cipher.code or inferred + from qb64 or qb2 code. Default maybe Salter, Signer, or Streamer. + Cipher's encrypted plain text is fully qualified (qb64) + so derivaton code of plain text preserved through encryption/decryption + round trip. + + Returns: - Salter or Signer instance derived from plain text decrypted from - encrypted cipher text material given by ser or cipher. Plain text - that is orignally encrypt should always be fully qualified (qb64b) - so that derivaton code of plain text is preserved through - encryption/decryption round trip. + decrypted (Matter | Indexer | Streamer | bytes): When bare is False + returns instance of decrypted cipher text of .raw which is + encrypted qb64, qb2, or sniffable stream depending on .code + hhen Bare is True. Otherwise returns decrypted serialization + plaintext whatever that may be. + + Keyword Parameters: + (see Matter because created Decrypter is Matter subclass) Parameters: - ser (Union[bytes,str]): qb64b or qb64 serialization of cipher text - cipher (Cipher): optional Cipher instance when ser is None - transferable (bool): True means associated verfer of returned - signer is transferable. False means non-transferable + cipher (Cipher): instance. One of cipher, qb64, or qb2 required. + qb64 (str | bytes | bytearray | memoryview | None ): serialization + of cipher text as fully qualified base64. When str, encodes as + utf-8. When bytearray and strip in kwa is True then strips. + qb2 (bytes | bytearray | memoryview | None ): serialization + of cipher text as fully qualified base2. Strips when bytearray + and strip in kwa is True. + klas (Matter | Indexer | Streamer): Class used to create instance from + decrypted serialization. + transferable (bool): Modifier of klas instance creation. + When klas init (such as Signer) supports transferabe parm; + True means verfer of returned signer is transferable. + False means non-transferable + bare (bool): False (default) means returns instance holding plaintext + True means returns plaintext itself """ - if not (ser or cipher): - raise EmptyMaterialError("Neither ser or cipher are provided.") + if not cipher: + if qb64: # create cipher from qb64 + cipher = Cipher(qb64b=qb64, **kwa) - if ser: # create cipher to ensure valid derivation code of material in ser - cipher = Cipher(qb64b=ser) + elif qb2: + cipher = Cipher(qb2=qb2, **kwa) + + else: + raise EmptyMaterialError(f"Need one of cipher, qb64, or qb2.") return (self._decrypt(cipher=cipher, prikey=self.raw, - transferable=transferable)) + klas=klas, + transferable=transferable, + bare=bare)) @staticmethod - def _x25519(cipher, prikey, transferable=False): + def _x25519(cipher, prikey, klas=None, transferable=False, bare=False): """ Returns plain text as Salter or Signer instance depending on the cipher code and the embedded encrypted plain text derivation code. @@ -933,15 +1018,40 @@ def _x25519(cipher, prikey, transferable=False): cipher (Cipher): instance of encrypted seed or salt prikey (bytes): raw binary decryption private key derived from signing seed or sigkey - transferable (bool): True means associated verfer of returned - signer is transferable. False means non-transferable + klas (Matter, Indexer, Streamer | None): Class used to create instance from + decrypted serialization. Default depends on cipher.code. + transferable (bool): Modifier of Klas instance creation. + When klas init (such as Signer) supports transferabe parm; + True means verfer of returned signer is transferable. + False means non-transferable + bare (bool): False (default) means CESR instance holding plaintext + True means plaintext """ + # assumes raw plain text is qb64b or qb64 or sniffable stream + # so it's round trippable pubkey = pysodium.crypto_scalarmult_curve25519_base(prikey) plain = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) # qb64b - # ensure raw plain text is qb64b or qb64 so its derivation code is round tripped - if cipher.code == MtrDex.X25519_Cipher_Salt: - return Salter(qb64b=plain) - elif cipher.code == MtrDex.X25519_Cipher_Seed: - return Signer(qb64b=plain, transferable=transferable) + + if bare: + return plain + else: - raise ValueError("Unsupported cipher text code = {}.".format(cipher.code)) + if not klas: + if cipher.code == CiXFixQB64Dex.X25519_Cipher_Salt: + klas = Salter + elif cipher.code == CiXFixQB64Dex.X25519_Cipher_Seed: + klas = Signer + elif cipher.code in CiXVarStrmDex: + klas = Streamer + else: + raise InvalidCodeError(f"Unsupported cipher code = {cipher.code}" + f" when klas missing.") + + if cipher.code in CiXAllQB64Dex: + return klas(qb64b=plain, transferable=transferable) + elif cipher.code in CiXVarQB2Dex: + return klas(qb2=plain) + elif cipher.code in CiXVarStrmDex: + return klas(stream=plain) + else: + raise InvalidCodeError(f"Unsupported cipher code = {cipher.code}.") diff --git a/src/keri/core/streaming.py b/src/keri/core/streaming.py index 617a74d2..f00e570c 100644 --- a/src/keri/core/streaming.py +++ b/src/keri/core/streaming.py @@ -48,7 +48,7 @@ def annot(ims): if not isinstance(ims, bytearray): # going to strip - ims = bytearray(ims) # so make bytearray copy + ims = bytearray(ims) # so make bytearray copy, converts str to bytearray while ims: # right now just for KERI event messages cold = sniff(ims) # check for spurious counters at front of stream @@ -340,16 +340,36 @@ class Streamer: """ - def __init__(self, stream): + def __init__(self, stream, verify=False): """Initialize instance + Holds sniffable CESR stream as byte like string + either (bytes, bytearray, or memoryview) Parameters: - stream (bytes | bytearray): sniffable CESR stream + stream (str | bytes | bytearray | memoryview): sniffable CESR stream + verify (bool): When True raise error if .stream is not sniffable. """ - self._stream = bytes(stream) + if hasattr(stream, "encode"): + stream = bytearray(stream.encode()) # convert str to bytearray + if not isinstance(stream, (bytes, bytearray, memoryview)): + raise kering.InvalidTypeError(f"Invalid stream type, not byteable.") + + self._stream = stream + + @property + def _verify(self): + """Returns True if sniffable stream, False otherwise + Returns: + sniffable (bool): True when .stream is sniffable. + False otherwise. + Only works for ver 2 CESR because need for all count codes to be + pipelineable in order to simply parse stream + + """ + return False @property @@ -358,33 +378,64 @@ def stream(self): """ return self._stream + @property def text(self): - """expanded stream as qb64 text + """expanded stream where all primitives and groups in stream are + individually expanded to qb64. + Requires parsing full depth to ensure expanded consistently. Returns: stream (bytes): expanded text qb64 version of stream + Only works for ver 2 CESR because need for all count codes to be + pipelineable in order to simply parse and expand stream + """ return self._stream @property def binary(self): - """compacted stream as qb2 binary + """compacted stream where all primitives and groups in stream are + individually compacted to qb2. + Requires parsing full depth to ensure compacted consistently Returns: stream (bytes): compacted binary qb2 version of stream + Only works for ver 2 CESR because need for all count codes to be + pipelineable in order to simply parse and compact stream + """ return self._stream @property def texter(self): - """expanded stream as Texter instance + """stream as Texter instance. + Texter(text=self.stream) Returns: texter (Texter): Texter primitive of stream suitable wrapping """ return self._stream + @property + def bexter(self): + """stream as Bexter instance. + Bexter of expanded text version of stream. + First expand to text which requires parsing then create bexter + Bexter(bext=self.text) + Because sniffable stream MUST NOT start with 'A' then there is no + length ambiguity. The only tritet collison of 'A' is with '-' but the + remaining 5 bits are guaranteed to always be different. So bexter must + check not just the starting tritet but the full starting byte to ensure + not 'A' as first byte. + + Requires parsing to ensure qb64 + Returns: + bexter (Bexter): Bexter primitive of stream suitable wrapping + + """ + return self._stream + diff --git a/src/keri/db/subing.py b/src/keri/db/subing.py index f007a28a..ec4ae778 100644 --- a/src/keri/db/subing.py +++ b/src/keri/db/subing.py @@ -949,7 +949,7 @@ def put(self, keys: Union[str, Iterable], val: coring.Matter, already in database. """ if encrypter: - val = encrypter.encrypt(matter=val) # returns Cipher instance + val = encrypter.encrypt(prim=val) # returns Cipher instance return (self.db.putVal(db=self.sdb, key=self._tokey(keys), val=val.qb64b)) @@ -970,7 +970,7 @@ def pin(self, keys: Union[str, Iterable], val: coring.Matter, result (bool): True If successful. False otherwise. """ if encrypter: - val = encrypter.encrypt(matter=val) # returns Cipher instance + val = encrypter.encrypt(prim=val) # returns Cipher instance return (self.db.setVal(db=self.sdb, key=self._tokey(keys), val=val.qb64b)) @@ -1008,7 +1008,7 @@ def get(self, keys: Union[str, Iterable], decrypter: core.Decrypter = None): keys = self._tokeys(key) # verkey is last split if any verfer = coring.Verfer(qb64b=keys[-1]) # last split if decrypter: - return (decrypter.decrypt(ser=bytes(val), + return (decrypter.decrypt(qb64=bytes(val), transferable=verfer.transferable)) return (self.klas(qb64b=bytes(val), transferable=verfer.transferable)) @@ -1037,7 +1037,7 @@ def getItemIter(self, keys: Union[str, Iterable]=b"", ikeys = self._tokeys(key) # verkey is last split if any verfer = coring.Verfer(qb64b=ikeys[-1]) # last split if decrypter: - yield (ikeys, decrypter.decrypt(ser=bytes(val), + yield (ikeys, decrypter.decrypt(qb64=bytes(val), transferable=verfer.transferable)) else: yield (ikeys, self.klas(qb64b=bytes(val), diff --git a/tests/app/test_keeping.py b/tests/app/test_keeping.py index 0d638aef..87474c49 100644 --- a/tests/app/test_keeping.py +++ b/tests/app/test_keeping.py @@ -1591,7 +1591,7 @@ def test_manager_with_aeid(): pp = manager.ks.prms.get(spre) assert pp.pidx == 0 assert pp.algo == keeping.Algos.salty - assert manager.decrypter.decrypt(ser=pp.salt).qb64 == salt + assert manager.decrypter.decrypt(qb64=pp.salt).qb64 == salt assert pp.stem == '' assert pp.tier == core.Tiers.low @@ -1674,7 +1674,7 @@ def test_manager_with_aeid(): pp = manager.ks.prms.get(spre) assert pp.pidx == 0 assert pp.algo == keeping.Algos.salty - assert manager.decrypter.decrypt(ser=pp.salt).qb64 == salt + assert manager.decrypter.decrypt(qb64=pp.salt).qb64 == salt assert pp.stem == '' assert pp.tier == core.Tiers.low diff --git a/tests/core/test_signing.py b/tests/core/test_signing.py index 9b8ec4d1..d71c93b1 100644 --- a/tests/core/test_signing.py +++ b/tests/core/test_signing.py @@ -529,9 +529,11 @@ def test_cipher(): # test .decrypt method needs qb64 prikeyqb64 = Matter(raw=prikey, code=MtrDex.X25519_Private).qb64b assert cipher.decrypt(prikey=prikeyqb64).qb64b == seedqb64b + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == seedqb64b cryptseedqb64 = Matter(raw=cryptseed, code=MtrDex.Ed25519_Seed).qb64b assert cipher.decrypt(seed=cryptseedqb64).qb64b == seedqb64b + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == seedqb64b # wrong but shorter code so instance creation succeeds cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_Salt) @@ -557,9 +559,11 @@ def test_cipher(): # test .decrypt method needs qb64 prikeyqb64 = Matter(raw=prikey, code=MtrDex.X25519_Private).qb64b assert cipher.decrypt(prikey=prikeyqb64).qb64b == saltqb64b + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == saltqb64b cryptseedqb64 = Matter(raw=cryptseed, code=MtrDex.Ed25519_Seed).qb64b assert cipher.decrypt(seed=cryptseedqb64).qb64b == saltqb64b + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == saltqb64b with pytest.raises(kering.InvalidCodeError): # bad code cipher = Cipher(raw=raw, code=MtrDex.Ed25519N) @@ -642,18 +646,18 @@ def test_cipher(): texter = core.Texter(text=plain) assert texter.text == plain counter = core.Counter(core.Codens.GenericGroup, count=texter.size) - peb = counter.qb64b + texter.qb64b - raw = pysodium.crypto_box_seal(peb, pubkey) # uses nonce so different everytime + strm = counter.qb64b + texter.qb64b + raw = pysodium.crypto_box_seal(strm, pubkey) # uses nonce so different everytime assert len(raw) == 108 assert (3 - (len(raw) % 3)) % 3 == 0 cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) assert cipher.code == CiXDex.X25519_Cipher_L0 uncb = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) - assert uncb == peb - peb = bytearray(peb) - counter = core.Counter(qb64b=peb, strip=True) + assert uncb == strm + strm = bytearray(strm) + counter = core.Counter(qb64b=strm, strip=True) assert counter.code == core.CtrDex_2_0.GenericGroup - texter = core.Texter(qb64b=peb, strip=True) + texter = core.Texter(qb64b=strm, strip=True) assert texter.text == plain # sniffable qb64 lead 1 @@ -661,18 +665,18 @@ def test_cipher(): texter = core.Texter(text=plain) assert texter.text == plain counter = core.Counter(core.Codens.GenericGroup, count=texter.size) - peb = counter.qb64b + texter.qb64b - raw = pysodium.crypto_box_seal(peb, pubkey) # uses nonce so different everytime + strm = counter.qb64b + texter.qb64b + raw = pysodium.crypto_box_seal(strm, pubkey) # uses nonce so different everytime assert len(raw) == 116 assert (3 - (len(raw) % 3)) % 3 == 1 cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) assert cipher.code == CiXDex.X25519_Cipher_L1 uncb = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) - assert uncb == peb - peb = bytearray(peb) - counter = core.Counter(qb64b=peb, strip=True) + assert uncb == strm + strm = bytearray(strm) + counter = core.Counter(qb64b=strm, strip=True) assert counter.code == core.CtrDex_2_0.GenericGroup - texter = core.Texter(qb64b=peb, strip=True) + texter = core.Texter(qb64b=strm, strip=True) assert texter.text == plain # sniffable qb64 lead 2 @@ -680,18 +684,18 @@ def test_cipher(): texter = core.Texter(text=plain) assert texter.text == plain counter = core.Counter(core.Codens.GenericGroup, count=texter.size) - peb = counter.qb64b + texter.qb64b - raw = pysodium.crypto_box_seal(peb, pubkey) # uses nonce so different everytime + strm = counter.qb64b + texter.qb64b + raw = pysodium.crypto_box_seal(strm, pubkey) # uses nonce so different everytime assert len(raw) == 112 assert (3 - (len(raw) % 3)) % 3 == 2 cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) assert cipher.code == CiXDex.X25519_Cipher_L2 uncb = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) - assert uncb == peb - peb = bytearray(peb) - counter = core.Counter(qb64b=peb, strip=True) + assert uncb == strm + strm = bytearray(strm) + counter = core.Counter(qb64b=strm, strip=True) assert counter.code == core.CtrDex_2_0.GenericGroup - texter = core.Texter(qb64b=peb, strip=True) + texter = core.Texter(qb64b=strm, strip=True) assert texter.text == plain # sniffable qb2 lead 0 @@ -700,19 +704,19 @@ def test_cipher(): assert texter.text == plain counter = core.Counter(core.Codens.GenericGroup, count=texter.size) assert counter.code == core.CtrDex_2_0.GenericGroup - peb = counter.qb64b + texter.qb64b - pebqb2 = decodeB64(peb) - raw = pysodium.crypto_box_seal(pebqb2, pubkey) # uses nonce so different everytime + strm = counter.qb64b + texter.qb64b + strmb2 = decodeB64(strm) + raw = pysodium.crypto_box_seal(strmb2, pubkey) # uses nonce so different everytime assert len(raw) == 93 assert (3 - (len(raw) % 3)) % 3 == 0 cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) assert cipher.code == CiXDex.X25519_Cipher_L0 uncb = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) - assert uncb == pebqb2 - pebqb2 = bytearray(pebqb2) - counter = core.Counter(qb2=pebqb2, strip=True) + assert uncb == strmb2 + strmb2 = bytearray(strmb2) + counter = core.Counter(qb2=strmb2, strip=True) assert counter.code == core.CtrDex_2_0.GenericGroup - texter = core.Texter(qb2=pebqb2, strip=True) + texter = core.Texter(qb2=strmb2, strip=True) assert texter.text == plain @@ -723,20 +727,282 @@ def test_cipher(): assert texter.text == plain counter = core.Counter(core.Codens.GenericGroup, count=texter.size) assert counter.code == core.CtrDex_2_0.BigGenericGroup - peb = counter.qb64b + texter.qb64b - pebqb2 = decodeB64(peb) - raw = pysodium.crypto_box_seal(pebqb2, pubkey) # uses nonce so different everytime + strm = counter.qb64b + texter.qb64b + strmb2 = decodeB64(strm) + raw = pysodium.crypto_box_seal(strmb2, pubkey) # uses nonce so different everytime assert len(raw) == 12696 assert (3 - (len(raw) % 3)) % 3 == 0 assert (len(raw) // 3 ) > (64 ** 2 - 1) # triplets cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) assert cipher.code == CiXDex.X25519_Cipher_Big_L0 uncb = pysodium.crypto_box_seal_open(cipher.raw, pubkey, prikey) - assert uncb == pebqb2 - pebqb2 = bytearray(pebqb2) - counter = core.Counter(qb2=pebqb2, strip=True) + assert uncb == strmb2 + strmb2 = bytearray(strmb2) + counter = core.Counter(qb2=strmb2, strip=True) assert counter.code == core.CtrDex_2_0.BigGenericGroup - texter = core.Texter(qb2=pebqb2, strip=True) + texter = core.Texter(qb2=strmb2, strip=True) + assert texter.text == plain + + # test .decrypt method with variable sized qb64 coded ciphers + # qb64 lead 0 + plain = "The quick brown fox jumps over the lazy " + texter = core.Texter(text=plain) + assert texter.text == plain + raw = pysodium.crypto_box_seal(texter.qb64b, pubkey) # uses nonce so different everytime + assert len(raw) == 108 + assert (3 - (len(raw) % 3)) % 3 == 0 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_QB64_L0) + assert cipher.code == CiXDex.X25519_Cipher_QB64_L0 + # test using prikey + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(prikey=prikeyqb64) + texter = cipher.decrypt(prikey=prikeyqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == texter.qb64b + # test using seed + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(seed=cryptseedqb64) + texter = cipher.decrypt(seed=cryptseedqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + texter = cipher.decrypt(seed=cryptseedqb64, bare=True) == texter.qb64b + + + # qb64 lead 1 + plain = "The quick brown fox jumps over the lazy dogcats" + texter = core.Texter(text=plain) + assert texter.text == plain + raw = pysodium.crypto_box_seal(texter.qb64b, pubkey) # uses nonce so different everytime + assert len(raw) == 116 + assert (3 - (len(raw) % 3)) % 3 == 1 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_QB64_L0) + assert cipher.code == CiXDex.X25519_Cipher_QB64_L1 + # test using prikey + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(prikey=prikeyqb64) + texter = cipher.decrypt(prikey=prikeyqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == texter.qb64b + # test using seed + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(seed=cryptseedqb64) + texter = cipher.decrypt(seed=cryptseedqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + texter = cipher.decrypt(seed=cryptseedqb64, bare=True) == texter.qb64b + + + # qb64 lead 2 + plain = "The quick brown fox jumps over the lazy dog" + texter = core.Texter(text=plain) + assert texter.text == plain + raw = pysodium.crypto_box_seal(texter.qb64b, pubkey) # uses nonce so different everytime + assert len(raw) == 112 + assert (3 - (len(raw) % 3)) % 3 == 2 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_QB64_L0) + assert cipher.code == CiXDex.X25519_Cipher_QB64_L2 + # test using prikey + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(prikey=prikeyqb64) + texter = cipher.decrypt(prikey=prikeyqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == texter.qb64b + # test using seed + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(seed=cryptseedqb64) + texter = cipher.decrypt(seed=cryptseedqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + texter = cipher.decrypt(seed=cryptseedqb64, bare=True) == texter.qb64b + + # test .decrypt method with variable sized qb2 coded ciphers + # qb2 lead 0 (always lead 0 when qb2 from texter) + plain = "The quick brown fox jumps over the lazy dog" + texter = core.Texter(text=plain) + assert texter.text == plain + raw = pysodium.crypto_box_seal(texter.qb2, pubkey) # uses nonce so different everytime + assert len(raw) == 96 + assert (3 - (len(raw) % 3)) % 3 == 0 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_QB2_L0) + assert cipher.code == CiXDex.X25519_Cipher_QB2_L0 + # test using prikey + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(prikey=prikeyqb64) + texter = cipher.decrypt(prikey=prikeyqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == texter.qb2 + # test using seed + with pytest.raises(kering.InvalidCodeError): # needs klas when var len qb64 + result = cipher.decrypt(seed=cryptseedqb64) + texter = cipher.decrypt(seed=cryptseedqb64, klas=core.Texter) + assert texter.text == plain + # test with bare + texter = cipher.decrypt(seed=cryptseedqb64, bare=True) == texter.qb2 + + # sniffable qb64 lead 0 + plain = "The quick brown fox jumps over the lazy" + texter = core.Texter(text=plain) + assert texter.text == plain + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + strm = counter.qb64b + texter.qb64b # sniffable stream + raw = pysodium.crypto_box_seal(strm, pubkey) # uses nonce so different everytime + assert len(raw) == 108 + assert (3 - (len(raw) % 3)) % 3 == 0 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L0 + # test using prikey + streamer = cipher.decrypt(prikey=prikeyqb64) # default klas is streamer + assert streamer.stream == strm + streamer = cipher.decrypt(prikey=prikeyqb64, klas=core.Streamer) + assert streamer.stream == strm + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == streamer.stream + # test using seed + streamer = cipher.decrypt(seed=cryptseedqb64) + assert streamer.stream == strm + streamer = cipher.decrypt(seed=cryptseedqb64, klas=core.Streamer) + assert streamer.stream == strm + # test with bare + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == streamer.stream + strm = bytearray(strm) + counter = core.Counter(qb64b=strm, strip=True) + assert counter.code == core.CtrDex_2_0.GenericGroup + texter = core.Texter(qb64b=strm, strip=True) + assert texter.text == plain + + + # sniffable qb64 lead 1 + plain = "The quick brown fox jumps over the lazy dog" + texter = core.Texter(text=plain) + assert texter.text == plain + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + strm = counter.qb64b + texter.qb64b + raw = pysodium.crypto_box_seal(strm, pubkey) # uses nonce so different everytime + assert len(raw) == 116 + assert (3 - (len(raw) % 3)) % 3 == 1 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L1 + # test using prikey + streamer = cipher.decrypt(prikey=prikeyqb64) # default klas is streamer + assert streamer.stream == strm + streamer = cipher.decrypt(prikey=prikeyqb64, klas=core.Streamer) + assert streamer.stream == strm + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == streamer.stream + # test using seed + streamer = cipher.decrypt(seed=cryptseedqb64) + assert streamer.stream == strm + streamer = cipher.decrypt(seed=cryptseedqb64, klas=core.Streamer) + assert streamer.stream == strm + # test with bare + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == streamer.stream + strm = bytearray(strm) + counter = core.Counter(qb64b=strm, strip=True) + assert counter.code == core.CtrDex_2_0.GenericGroup + texter = core.Texter(qb64b=strm, strip=True) + assert texter.text == plain + + # sniffable qb64 lead 2 + plain = "The quick brown fox jumps over the lazy " + texter = core.Texter(text=plain) + assert texter.text == plain + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + strm = counter.qb64b + texter.qb64b + raw = pysodium.crypto_box_seal(strm, pubkey) # uses nonce so different everytime + assert len(raw) == 112 + assert (3 - (len(raw) % 3)) % 3 == 2 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L2 + # test using prikey + streamer = cipher.decrypt(prikey=prikeyqb64) # default klas is streamer + assert streamer.stream == strm + streamer = cipher.decrypt(prikey=prikeyqb64, klas=core.Streamer) + assert streamer.stream == strm + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == streamer.stream + # test using seed + streamer = cipher.decrypt(seed=cryptseedqb64) + assert streamer.stream == strm + streamer = cipher.decrypt(seed=cryptseedqb64, klas=core.Streamer) + assert streamer.stream == strm + # test with bare + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == streamer.stream + strm = bytearray(strm) + counter = core.Counter(qb64b=strm, strip=True) + assert counter.code == core.CtrDex_2_0.GenericGroup + texter = core.Texter(qb64b=strm, strip=True) + assert texter.text == plain + + # sniffable qb2 lead 0 + plain = "The quick brown fox jumps over the lazy" + texter = core.Texter(text=plain) + assert texter.text == plain + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + assert counter.code == core.CtrDex_2_0.GenericGroup + strm = counter.qb64b + texter.qb64b + strmb2 = decodeB64(strm) + raw = pysodium.crypto_box_seal(strmb2, pubkey) # uses nonce so different everytime + assert len(raw) == 93 + assert (3 - (len(raw) % 3)) % 3 == 0 + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L0 + # test using prikey + streamer = cipher.decrypt(prikey=prikeyqb64) # default klas is streamer + assert streamer.stream == strmb2 + streamer = cipher.decrypt(prikey=prikeyqb64, klas=core.Streamer) + assert streamer.stream == strmb2 + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == streamer.stream + # test using seed + streamer = cipher.decrypt(seed=cryptseedqb64) + assert streamer.stream == strmb2 + streamer = cipher.decrypt(seed=cryptseedqb64, klas=core.Streamer) + assert streamer.stream == strmb2 + # test with bare + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == streamer.stream + strmb2 = bytearray(strmb2) + counter = core.Counter(qb2=strmb2, strip=True) + assert counter.code == core.CtrDex_2_0.GenericGroup + texter = core.Texter(qb2=strmb2, strip=True) + assert texter.text == plain + + # sniffable qb2 lead 0 Big + + plain = "The quick brown fox jumps over the lazy" * 324 + texter = core.Texter(text=plain) + assert texter.text == plain + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + assert counter.code == core.CtrDex_2_0.BigGenericGroup + strm = counter.qb64b + texter.qb64b + strmb2 = decodeB64(strm) + raw = pysodium.crypto_box_seal(strmb2, pubkey) # uses nonce so different everytime + assert len(raw) == 12696 + assert (3 - (len(raw) % 3)) % 3 == 0 + assert (len(raw) // 3 ) > (64 ** 2 - 1) # triplets + cipher = Cipher(raw=raw, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_Big_L0 + # test using prikey + streamer = cipher.decrypt(prikey=prikeyqb64) # default klas is streamer + assert streamer.stream == strmb2 + streamer = cipher.decrypt(prikey=prikeyqb64, klas=core.Streamer) + assert streamer.stream == strmb2 + # test with bare + assert cipher.decrypt(prikey=prikeyqb64, bare=True) == streamer.stream + # test using seed + streamer = cipher.decrypt(seed=cryptseedqb64) + assert streamer.stream == strmb2 + streamer = cipher.decrypt(seed=cryptseedqb64, klas=core.Streamer) + assert streamer.stream == strmb2 + # test with bare + assert cipher.decrypt(seed=cryptseedqb64, bare=True) == streamer.stream + strmb2 = bytearray(strmb2) + counter = core.Counter(qb2=strmb2, strip=True) + assert counter.code == core.CtrDex_2_0.BigGenericGroup + texter = core.Texter(qb2=strmb2, strip=True) assert texter.text == plain """ Done Test """ @@ -779,16 +1045,6 @@ def test_encrypter(): assert encrypter.raw == pubkey assert encrypter.verifySeed(seed=cryptsigner.qb64) - cipher = encrypter.encrypt(ser=seedqb64b) - assert cipher.code == MtrDex.X25519_Cipher_Seed - uncb = pysodium.crypto_box_seal_open(cipher.raw, encrypter.raw, prikey) - assert uncb == seedqb64b - - cipher = encrypter.encrypt(ser=saltqb64b) - assert cipher.code == MtrDex.X25519_Cipher_Salt - uncb = pysodium.crypto_box_seal_open(cipher.raw, encrypter.raw, prikey) - assert uncb == saltqb64b - verfer = Verfer(raw=verkey, code=MtrDex.Ed25519) encrypter = Encrypter(verkey=verfer.qb64) @@ -807,6 +1063,27 @@ def test_encrypter(): assert encrypter.code == MtrDex.X25519 assert encrypter.qb64 == 'CAF7Wr3XNq5hArcOuBJzaY6Nd23jgtUVI6KDfb3VngkR' assert encrypter.raw == pubkey + + # Test encrypt method + encrypter = Encrypter(raw=pubkey) + assert encrypter.code == MtrDex.X25519 + assert encrypter.qb64 == 'CAF7Wr3XNq5hArcOuBJzaY6Nd23jgtUVI6KDfb3VngkR' + assert encrypter.raw == pubkey + assert encrypter.verifySeed(seed=cryptsigner.qb64) + + cipher = encrypter.encrypt(ser=seedqb64b, code=MtrDex.X25519_Cipher_Seed) + assert cipher.code == MtrDex.X25519_Cipher_Seed + uncb = pysodium.crypto_box_seal_open(cipher.raw, encrypter.raw, prikey) + assert uncb == seedqb64b + + cipher = encrypter.encrypt(ser=saltqb64b, code=MtrDex.X25519_Cipher_Salt) + assert cipher.code == MtrDex.X25519_Cipher_Salt + uncb = pysodium.crypto_box_seal_open(cipher.raw, encrypter.raw, prikey) + assert uncb == saltqb64b + + # needs tests of encrypter with prim param instead of ser (see roundtrip) + + """ Done Test """ @@ -857,7 +1134,7 @@ def test_decrypter(): assert encrypter.raw == pubkey # create cipher of seed - seedcipher = encrypter.encrypt(ser=seedqb64b) + seedcipher = encrypter.encrypt(ser=seedqb64b, code=MtrDex.X25519_Cipher_Seed) assert seedcipher.code == MtrDex.X25519_Cipher_Seed # each encryption uses a nonce so not a stable representation for testing @@ -868,12 +1145,18 @@ def test_decrypter(): assert decrypter.raw == prikey # decrypt seed cipher using ser - designer = decrypter.decrypt(ser=seedcipher.qb64b, transferable=signer.verfer.transferable) + designer = decrypter.decrypt(qb64=seedcipher.qb64b, transferable=signer.verfer.transferable) assert designer.qb64b == seedqb64b assert designer.code == MtrDex.Ed25519_Seed assert designer.verfer.code == MtrDex.Ed25519 assert signer.verfer.transferable + # test bare decryption returns plain not instance + plain = decrypter.decrypt(qb64=seedcipher.qb64b, + transferable=signer.verfer.transferable, + bare=True) + assert plain == seedqb64b + # decrypt seed cipher using cipher designer = decrypter.decrypt(cipher=seedcipher, transferable=signer.verfer.transferable) assert designer.qb64b == seedqb64b @@ -882,15 +1165,19 @@ def test_decrypter(): assert signer.verfer.transferable # create cipher of salt - saltcipher = encrypter.encrypt(ser=saltqb64b) + saltcipher = encrypter.encrypt(ser=saltqb64b, code=MtrDex.X25519_Cipher_Salt) assert saltcipher.code == MtrDex.X25519_Cipher_Salt # each encryption uses a nonce so not a stable representation for testing # decrypt salt cipher using ser - desalter = decrypter.decrypt(ser=saltcipher.qb64b) + desalter = decrypter.decrypt(qb64=saltcipher.qb64b) assert desalter.qb64b == saltqb64b assert desalter.code == MtrDex.Salt_128 + # test bare decryption returns plain not instance + plain = decrypter.decrypt(qb64=saltcipher.qb64b, bare=True) + assert plain == saltqb64b + # decrypt salt cipher using cipher desalter = decrypter.decrypt(cipher=saltcipher) assert desalter.qb64b == saltqb64b @@ -900,7 +1187,7 @@ def test_decrypter(): # get from seedcipher above cipherseed = ('PM9jOGWNYfjM_oLXJNaQ8UlFSAV5ACjsUY7J16xfzrlpc9Ve3A5WYrZ4o_' 'NHtP5lhp78Usspl9fyFdnCdItNd5JyqZ6dt8SXOt6TOqOCs-gy0obrwFkPPqBvVkEw') - designer = decrypter.decrypt(ser=cipherseed, transferable=signer.verfer.transferable) + designer = decrypter.decrypt(qb64=cipherseed, transferable=signer.verfer.transferable) assert designer.qb64b == seedqb64b assert designer.code == MtrDex.Ed25519_Seed assert designer.verfer.code == MtrDex.Ed25519 @@ -909,7 +1196,7 @@ def test_decrypter(): # get from saltcipher above ciphersalt = ('1AAHjlR2QR9J5Et67Wy-ZaVdTryN6T6ohg44r73GLRPnHw-5S3ABFkhWy' 'IwLOI6TXUB_5CT13S8JvknxLxBaF8ANPK9FSOPD8tYu') - desalter = decrypter.decrypt(ser=ciphersalt) + desalter = decrypter.decrypt(qb64=ciphersalt) assert desalter.qb64b == saltqb64b assert desalter.code == MtrDex.Salt_128 @@ -920,12 +1207,124 @@ def test_decrypter(): assert decrypter.raw == prikey # decrypt ciphersalt - desalter = decrypter.decrypt(ser=saltcipher.qb64b) + desalter = decrypter.decrypt(qb64=saltcipher.qb64b) assert desalter.qb64b == saltqb64b assert desalter.code == MtrDex.Salt_128 + + """ Done Test """ +def test_roundtrip(): + """Test round trip encrypt decrypt with variable sized ciphers""" + + # cryptseed = pysodium.randombytes(pysodium.crypto_box_SEEDBYTES) + cryptseed = b'h,#|\x8ap"\x12\xc43t2\xa6\xe1\x18\x19\xf0f2,y\xc4\xc21@\xf5@\x15.\xa2\x1a\xcf' + verkey, sigkey = pysodium.crypto_sign_seed_keypair(cryptseed) # raw + pubkey = pysodium.crypto_sign_pk_to_box_pk(verkey) + prikey = pysodium.crypto_sign_sk_to_box_sk(sigkey) + + # create decrypter from prikey + decrypter = Decrypter(raw=prikey) + assert decrypter.code == MtrDex.X25519_Private + assert decrypter.qb64 == 'OLCFxqMz1z1UUS0TEJnvZP_zXHcuYdQsSGBWdOZeY5VQ' + assert decrypter.raw == prikey + + # create encrypter from pubkey + encrypter = Encrypter(raw=pubkey) + assert encrypter.code == MtrDex.X25519 + assert encrypter.qb64 == 'CAF7Wr3XNq5hArcOuBJzaY6Nd23jgtUVI6KDfb3VngkR' + assert encrypter.raw == pubkey + + + # Test cipher qb2 (always L0 when qb2) + + plain = "The quick brown fox jumps over the lazy dog" + tin = core.Texter(text=plain) # texter in + # create cipher using Encrypter + cipher = encrypter.encrypt(prim=tin, code=CiXDex.X25519_Cipher_QB2_L0) + assert cipher.code == CiXDex.X25519_Cipher_QB2_L0 + # decrypt cipher using Decrypter + tout = decrypter.decrypt(cipher=cipher, klas=core.Texter) # texter out + assert tout.text == tin.text + + + # sniffable qb64 lead 0 + plain = "The quick brown fox jumps over the lazy" + texter = core.Texter(text=plain) + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + # sniffable streamer in + sin = core.Streamer(stream=counter.qb64b + texter.qb64b) + # create cipher using Encrypter + cipher = encrypter.encrypt(prim=sin, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L0 + # decrypt cipher using Decrypter + # sniffable stream out + sout = decrypter.decrypt(cipher=cipher, klas=core.Streamer) + assert sin.stream == sout.stream + + + # sniffable qb64 lead 1 + plain = "The quick brown fox jumps over the lazy dog" + texter = core.Texter(text=plain) + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + # sniffable streamer in + sin = core.Streamer(stream=counter.qb64b + texter.qb64b) + # create cipher using Encrypter + cipher = encrypter.encrypt(prim=sin, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L1 + # decrypt cipher using Decrypter + # sniffable stream out + sout = decrypter.decrypt(cipher=cipher, klas=core.Streamer) + assert sin.stream == sout.stream + + + # sniffable qb64 lead 2 + plain = "The quick brown fox jumps over the lazy " + texter = core.Texter(text=plain) + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + # sniffable streamer in + sin = core.Streamer(stream=counter.qb64b + texter.qb64b) + # create cipher using Encrypter + cipher = encrypter.encrypt(prim=sin, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L2 + # decrypt cipher using Decrypter + # sniffable stream out + sout = decrypter.decrypt(cipher=cipher, klas=core.Streamer) + assert sin.stream == sout.stream + + + # sniffable qb2 lead 0 + plain = "The quick brown fox jumps over the lazy" + texter = core.Texter(text=plain) + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + # sniffable streamer in + sin = core.Streamer(stream=counter.qb2 + texter.qb2) + # create cipher using Encrypter + cipher = encrypter.encrypt(prim=sin, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_L0 + # decrypt cipher using Decrypter + # sniffable stream out + sout = decrypter.decrypt(cipher=cipher, klas=core.Streamer) + assert sin.stream == sout.stream + + # sniffable qb2 lead 0 Big + plain = "The quick brown fox jumps over the lazy" * 324 + texter = core.Texter(text=plain) + counter = core.Counter(core.Codens.GenericGroup, count=texter.size) + assert counter.code == core.CtrDex_2_0.BigGenericGroup + # sniffable streamer in + sin = core.Streamer(stream=counter.qb2 + texter.qb2) + # create cipher using Encrypter + cipher = encrypter.encrypt(prim=sin, code=CiXDex.X25519_Cipher_L0) + assert cipher.code == CiXDex.X25519_Cipher_Big_L0 + # decrypt cipher using Decrypter + # sniffable stream out + sout = decrypter.decrypt(cipher=cipher, klas=core.Streamer) + assert sin.stream == sout.stream + + """End Test""" + if __name__ == "__main__": @@ -936,4 +1335,5 @@ def test_decrypter(): test_cipher() test_encrypter() test_decrypter() + test_roundtrip() diff --git a/tests/peer/test_exchanging.py b/tests/peer/test_exchanging.py index 54115ae6..4a0a0a00 100644 --- a/tests/peer/test_exchanging.py +++ b/tests/peer/test_exchanging.py @@ -84,7 +84,7 @@ def test_essrs(): # Pull the logged ESSR attachment and verify it is the one attached texter = recHby.db.essrs.get(keys=(serder.said,)) - raw = recHab.decrypt(texter[0].raw) + raw = recHab.decrypt(ser=texter[0].raw) assert raw.decode("utf-8") == msg # Test with invalid diger