diff --git a/.gitmodules b/.gitmodules deleted file mode 100644 index 5d26f5a0..00000000 --- a/.gitmodules +++ /dev/null @@ -1,3 +0,0 @@ -[submodule "poc/draft-irtf-cfrg-kangarootwelve"] - path = poc/draft-irtf-cfrg-kangarootwelve - url = https://github.com/cfrg/draft-irtf-cfrg-kangarootwelve diff --git a/poc/Makefile b/poc/Makefile index 3b53df47..c5cec7df 100644 --- a/poc/Makefile +++ b/poc/Makefile @@ -2,7 +2,6 @@ test: sage -python common.py sage -python field.py sage -python xof.py - sage -python turboshake.py sage -python flp.py sage -python flp_generic.py sage -python idpf.py diff --git a/poc/README.md b/poc/README.md index 1765d10a..c4bf02e0 100644 --- a/poc/README.md +++ b/poc/README.md @@ -15,6 +15,8 @@ In order to run the code you will need to install sage --pip install pycryptodomex ``` +Version 3.20.0 or later is required. + ## Generating test vectors To generate test vectors, set the value of `TEST_VECTOR` in `common.py` to diff --git a/poc/draft-irtf-cfrg-kangarootwelve b/poc/draft-irtf-cfrg-kangarootwelve deleted file mode 160000 index 11e7bc60..00000000 --- a/poc/draft-irtf-cfrg-kangarootwelve +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 11e7bc6052d1ccdc87994cd9905b27535401098b diff --git a/poc/turboshake.py b/poc/turboshake.py deleted file mode 100644 index 26cafd6b..00000000 --- a/poc/turboshake.py +++ /dev/null @@ -1,189 +0,0 @@ -# A stateful implementation of TurboSHAKE adapted from the reference implementation -# -# We use TurboSHAKE in two steps: -# -# 1. Message fragments are absorbed into the hash state -# 2. Output fragments are squeezed out of the hash state -# -# The reference implementation of TurboSHAKE only provides a "one-shot" API, -# where the message and the length of the output are determined in advance. -# -# The stateful API is not needed if you know the desired output length in -# advance. Even if you don't know the desired output length, you can always do -# something like this: -# -# 1. Concatenate the message fragments into message `M` -# 2. Keep track of the output length `totalOutputBytesLen` squeezed so far and -# output `TurboSHAKE(c, M, D, totalOutputBytesLen+nextOutputBytesLen)`. -# -# However if the output length is large, then this is prohibitively slow, even -# for reference code. In particular, this makes the unit tests for Prio3 and -# Poplar1 take well over 30 seconds to run. Thus the purpose of implementing a -# stateful API is to make our unit tests run in a reasonable amount of time. - -import os -import sys - -kangarootwelve_path = \ - "%s/draft-irtf-cfrg-kangarootwelve/py" % os.path.dirname(__file__) # nopep8 -assert os.path.isdir(kangarootwelve_path) # nopep8 -sys.path.append(kangarootwelve_path) # nopep8 - -from TurboSHAKE import KeccakP1600, TurboSHAKE128 - - -class TurboSHAKEAbosrb: - '''TurboSHAKE in the absorb state.''' - - def __init__(self, c, D): - ''' - Initialize the absorb state with capacity `c` (number of bits) and - domain separation byte `D`. - ''' - self.D = D - self.rate_in_bytes = (1600-c)//8 - self.state = bytearray([0 for i in range(200)]) - self.state_offset = 0 - - def update(self, M): - ''' - Update the absorb state with message fragment `M`. - ''' - input_offset = 0 - while input_offset < len(M): - length = len(M)-input_offset - block_size = min(length, self.rate_in_bytes-self.state_offset) - for i in range(block_size): - self.state[i+self.state_offset] ^= M[i+input_offset] - input_offset += block_size - self.state_offset += block_size - if self.state_offset == self.rate_in_bytes: - self.state = KeccakP1600(self.state, 12) - self.state_offset = 0 - - def squeeze(self): - ''' - Consume the absorb state and return the TurboSHAKE squeeze state. - ''' - state = self.state[:] # deep copy - state[self.state_offset] ^= self.D - if (((self.D & 0x80) != 0) and - (self.state_offset == (self.rate_in_bytes-1))): - state = KeccakP1600(state, 12) - state[self.rate_in_bytes-1] = state[self.rate_in_bytes-1] ^ 0x80 - state = KeccakP1600(state, 12) - - squeeze = TurboSHAKESqueeze() - squeeze.rate_in_bytes = self.rate_in_bytes - squeeze.state = state - squeeze.state_offset = 0 - return squeeze - - -class TurboSHAKESqueeze: - '''TurboSHAKE in the squeeze state.''' - - def next(self, length): - ''' - Return the next `length` bytes of output and update the squeeze state. - ''' - output = bytearray() - while length > 0: - block_size = min(length, self.rate_in_bytes-self.state_offset) - length -= block_size - output += \ - self.state[self.state_offset:self.state_offset+block_size] - self.state_offset += block_size - if self.state_offset == self.rate_in_bytes: - self.state = KeccakP1600(self.state, 12) - self.state_offset = 0 - return output - - -def NewTurboSHAKE128(D): - ''' - Return the absorb state for TurboSHAKE128 with domain separation byte `D`. - ''' - return TurboSHAKEAbosrb(256, D) - - -def testAPI(stateful, oneshot): - '''Test that the outputs of the stateful and oneshot APIs match.''' - - test_cases = [ - { - 'fragments': [], - 'lengths': [], - }, - { - 'fragments': [], - 'lengths': [ - 1000, - ], - }, - { - 'fragments': [ - b'\xff' * 500, - ], - 'lengths': [ - 12, - ], - }, - { - 'fragments': [ - b'hello', - b', ', - b'', - b'world', - ], - 'lengths': [ - 1, - 17, - 256, - 128, - 0, - 7, - 14, - ], - }, - { - 'fragments': [ - b'\xff' * 1024, - b'\x17' * 23, - b'', - b'\xf1' * 512, - ], - 'lengths': [ - 1000, - 0, - 0, - 14, - ], - - } - ] - - D = 99 - for (i, test_case) in enumerate(test_cases): - absorb = stateful(D) - message = bytearray() - for fragment in test_case['fragments']: - absorb.update(fragment) - message += fragment - squeeze = absorb.squeeze() - output = b'' - output_len = 0 - for length in test_case['lengths']: - output += squeeze.next(length) - output_len += length - expected_output = oneshot(message, D, output_len) - if output != expected_output: - raise Exception('test case {} failed: got {}; want {}'.format( - i, - output.hex(), - expected_output.hex(), - )) - - -if __name__ == '__main__': - testAPI(NewTurboSHAKE128, TurboSHAKE128) diff --git a/poc/xof.py b/poc/xof.py index c5159090..b5168f9c 100644 --- a/poc/xof.py +++ b/poc/xof.py @@ -2,12 +2,12 @@ from __future__ import annotations +from Crypto.Hash import TurboSHAKE128 from Cryptodome.Cipher import AES from common import (TEST_VECTOR, VERSION, Bytes, Unsigned, concat, format_dst, from_le_bytes, gen_rand, next_power_of_2, print_wrapped_line, to_le_bytes, xor) -from turboshake import NewTurboSHAKE128, TurboSHAKE128 class Xof: @@ -76,12 +76,11 @@ def __init__(self, seed, dst, binder): self.m = to_le_bytes(len(dst), 1) + dst + seed + binder ''' self.length_consumed = 0 - state = NewTurboSHAKE128(1) - state.update(to_le_bytes(len(dst), 1)) - state.update(dst) - state.update(seed) - state.update(binder) - self.state = state.squeeze() + self.h = TurboSHAKE128.new(domain=1) + self.h.update(to_le_bytes(len(dst), 1)) + self.h.update(dst) + self.h.update(seed) + self.h.update(binder) def next(self, length): ''' @@ -97,7 +96,7 @@ def next(self, length): stream = TurboSHAKE128(self.m, 1, self.l) return stream[-length:] ''' - return self.state.next(length) + return self.h.read(length) class XofFixedKeyAes128(Xof): @@ -122,8 +121,11 @@ def __init__(self, seed, dst, binder): # # Implementation note: This step can be cached across XOF # evaluations with many different seeds. - fixed_key = TurboSHAKE128( - to_le_bytes(len(dst), 1) + dst + binder, 2, 16) + h = TurboSHAKE128.new(domain=2) + h.update(to_le_bytes(len(dst), 1)) + h.update(dst) + h.update(binder) + fixed_key = h.read(16) self.cipher = AES.new(fixed_key, AES.MODE_ECB) # Save seed to be used in `next`. self.seed = seed