Skip to content

Commit

Permalink
Merging with develop
Browse files Browse the repository at this point in the history
  • Loading branch information
juaninf committed Feb 19, 2024
2 parents 7d94357 + 98b39c7 commit e27ca2b
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 14 deletions.
3 changes: 2 additions & 1 deletion claasp/cipher.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ def data_generator(nr, samples):
testing_samples, num_epochs=number_of_epochs)

def run_autond_pipeline(self, difference_positions=None, optimizer_samples=10 ** 4, optimizer_generations=50,
training_samples=10 ** 7, testing_samples=10 ** 6, number_of_epochs=40, verbose=False):
training_samples=10 ** 7, testing_samples=10 ** 6, number_of_epochs=40, verbose=False, neural_net = 'dbitnet', save_prefix=None):
"""
Runs the AutoND pipeline ([BGHR2023]):
- Find an input difference for the inputs set to True in difference_positions using an optimizer
Expand All @@ -1368,6 +1368,7 @@ def run_autond_pipeline(self, difference_positions=None, optimizer_samples=10 **
- ``testing_samples`` -- **integer**; (default: `10**6`) number samples used for testing
- ``number_of_epochs`` -- **integer**; (default: `40`) number of training epochs
- ``verbose`` -- **boolean**; (default: `False`) verbosity of the optimizer
- ``neural_net`` -- **string**; (default: `dbitnet`) the neural network architecture to use; supports 'dbitnet' and 'gohr_resnet'
EXAMPLES::
Expand Down
124 changes: 120 additions & 4 deletions claasp/cipher_modules/component_analysis_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,7 @@ def branch_number(component, type, format):
elif component.type == "mix_column":
return min(calculate_weights_for_mix_column(component, format, type))


def instantiate_matrix_over_correct_field(matrix, polynomial_as_int, word_size, input_bit_size, output_bit_size):
"""
sage: from claasp.ciphers.block_ciphers.midori_block_cipher import MidoriBlockCipher
Expand Down Expand Up @@ -319,6 +320,7 @@ def instantiate_matrix_over_correct_field(matrix, polynomial_as_int, word_size,

return final_mtr, F


def is_mds(component):
"""
A matrix is MDS if and only if all the minors (determinants of square submatrices) are non-zero
Expand Down Expand Up @@ -364,6 +366,7 @@ def is_mds(component):
return False
return True


def field_element_matrix_to_integer_matrix(matrix):
"""
Converts a matrix of field elements to the corresponding integer matrix representation
Expand Down Expand Up @@ -400,6 +403,7 @@ def field_element_matrix_to_integer_matrix(matrix):

return Matrix(matrix.nrows(), matrix.ncols(), int_matrix)


def get_inverse_matrix_in_integer_representation(component):
"""
Returns the inverse matrix in its integer representation
Expand Down Expand Up @@ -436,6 +440,7 @@ def get_inverse_matrix_in_integer_representation(component):
component.input_bit_size, component.output_bit_size)
return field_element_matrix_to_integer_matrix(matrix.inverse())


def has_maximal_branch_number(component):
"""
INPUT:
Expand Down Expand Up @@ -479,6 +484,7 @@ def has_maximal_branch_number(component):
if component.type == MIX_COLUMN:
return branch_number(component, 'linear', 'word') == (output_word_size + 1)


def calculate_weights_for_mix_column(component, format, type):
if format == 'word':
description = component.description
Expand Down Expand Up @@ -569,6 +575,8 @@ def select_properties_function(boolean_polynomial_ring, operation):
return word_operation_properties(operation, boolean_polynomial_ring)
if (component.type == WORD_OPERATION) and (component.description[0] == "MODADD"):
return word_operation_properties(operation, boolean_polynomial_ring)
if component.type == 'fsr':
return fsr_properties(operation)

if component.type == WORD_OPERATION:
print(f"TODO : {component.description[0]}")
Expand Down Expand Up @@ -632,17 +640,21 @@ def get_all_operations(cipher):
collect_component_operations(component, tmp_cipher_operations)

for operation in list(tmp_cipher_operations.keys()):
if operation not in [LINEAR_LAYER, MIX_COLUMN]:
if operation not in [LINEAR_LAYER, MIX_COLUMN, 'fsr']:
tmp_cipher_operations[operation]["distinguisher"] = \
list(set(tmp_cipher_operations[operation]["distinguisher"]))
if operation == 'fsr':
tmp_list = []
for item in tmp_cipher_operations[operation]["distinguisher"]:
if item not in tmp_list:
tmp_list.append(item)
tmp_cipher_operations[operation]["distinguisher"] = tmp_list
tmp_cipher_operations[operation]["types"] = \
[[] for _ in range(len(tmp_cipher_operations[operation]["distinguisher"]))]
collect_components_with_the_same_operation(operation, tmp_cipher_operations)

cipher_operations = {}
for operation in list(tmp_cipher_operations.keys()):
add_attributes_to_operation(cipher_operations, operation, tmp_cipher_operations)

return cipher_operations


Expand Down Expand Up @@ -771,6 +783,110 @@ def order_of_linear_component(component):
return 0


def fsr_properties(operation):
"""
Return a dictionary containing some properties of fsr component.
INPUT:
- ``operation`` -- **list**; a list containing:
* a component with the operation under study
* number of occurrences of the operation
* list of ids of all the components with the same underlying operation
EXAMPLES::
sage: from claasp.cipher_modules.component_analysis_tests import fsr_properties
sage: from claasp.components.fsr_component import FSR
sage: fsr_component = FSR(0,0, ["input"],[[0,1,2,3]],4,[[[4, [[1,[0]],[3,[1]],[2,[2]]]]],4])
sage: operation= [fsr_component, 1, ['fsr_0_0']]
sage: dictionary = fsr_properties(operation)
sage: dictionary['fsr_word_size'] == 4
True
sage: dictionary['lfsr_connection_polynomials'] == ['x^4 + (z4 + 1)*x^3 + z4*x^2 + 1']
True
sage: from claasp.ciphers.stream_ciphers.bluetooth_stream_cipher_e0 import BluetoothStreamCipherE0
sage: from claasp.cipher_modules.component_analysis_tests import component_analysis_tests
sage: e0 = BluetoothStreamCipherE0(keystream_bit_len=2)
sage: dictionary = e0.component_analysis_tests()
sage: assert dictionary[8]["number_of_registers"] == 4
sage: dictionary[8]["lfsr_connection_polynomials"][0] == 'x^25 + x^20 + x^12 + x^8 + 1' # first lfsr
True
sage: dictionary[8]['lfsr_polynomials_are_primitive'] == [True, True, True, True]
True
sage: from claasp.ciphers.stream_ciphers.trivium_stream_cipher import TriviumStreamCipher
sage: triv = TriviumStreamCipher(keystream_bit_len=1)
sage: dictionary = triv.component_analysis_tests()
sage: dictionary[0]["type_of_registers"] == ['non-linear', 'non-linear', 'non-linear']
True
"""
component = operation[0]
fsr_word_size = component.description[1]
component_dict = {
"type": component.type,
"input_bit_size": component.input_bit_size,
"output_bit_size": component.output_bit_size,
"fsr_word_size": fsr_word_size,
"description": component.description,
"number_of_occurrences": operation[1],
"component_id_list": operation[2]
}

desc = component.description
registers_len = []
registers_type = []
registers_feedback_relation_deg = []
lfsr_connection_polynomials = []
lin_flag = False

for r in desc[0]:
registers_len.append(r[0])
d = max(len(term) if fsr_word_size == 1 else len(term[1]) for term in r[1])
registers_feedback_relation_deg.append(d)
reg_type = 'non-linear' if d > 1 else 'linear'
registers_type.append(reg_type)
lin_flag = lin_flag or (reg_type == 'linear')

component_dict.update({
'number_of_registers': len(registers_len),
'length_of_registers': registers_len,
'type_of_registers': registers_type,
'degree_of_feedback_relation_of_registers': registers_feedback_relation_deg
})

if lin_flag:
lfsrs_primitive = []
exp = 0
R = GF(2)['x'] if fsr_word_size == 1 else GF(2 ** fsr_word_size)['x']
x = R.gens()
a = R.construction()[1].gen()

for index, r in enumerate(desc[0]):
exp = exp + registers_len[index]
if registers_type[index] == 'linear':
p = R(1)
for term in r[1]:
if fsr_word_size == 1:
p = p + x[0] ** (exp - term[0])
else: # case: word based LFSR
m = 0
cf = "{0:b}".format(term[0])
for i in range(len(cf)):
if cf[i] == '1': m = m + pow(a, len(cf) - 1 - i)
m = m * x[0] ** (exp - term[1][0])
p += m
lfsr_connection_polynomials.append(str(p))
lfsrs_primitive.append(p.is_primitive())
component_dict.update({
"lfsr_connection_polynomials": lfsr_connection_polynomials,
"lfsr_polynomials_are_primitive": lfsrs_primitive
})
return component_dict


def sbox_properties(operation):
"""
Return a dictionary containing some properties of Sbox component.
Expand Down Expand Up @@ -1050,4 +1166,4 @@ def remove_components_with_strings_as_values(results_without_xor):
str_in_list.append(isinstance(results_without_xor[i]["properties"][result_property]["value"], str))
if True not in str_in_list:
results.append(results_without_xor[i])
return results
return results
23 changes: 15 additions & 8 deletions claasp/cipher_modules/neural_network_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -280,12 +280,12 @@ def get_differential_dataset(cipher, input_differences, number_of_rounds, sample
inputs_1.append(inputs_0[-1] ^ integer_to_np(input_differences[i], cipher.inputs_bit_size[i]))
inputs_1[-1][:, y == 0] ^= np.frombuffer(urandom(num_rand_samples * cipher.inputs_bit_size[i] // 8),
dtype=np.uint8).reshape(-1, num_rand_samples)

C0 = np.unpackbits(
cipher.evaluate_vectorized(inputs_0, intermediate_outputs=True)['round_output'][number_of_rounds - 1], axis=1)
C1 = np.unpackbits(
cipher.evaluate_vectorized(inputs_1, intermediate_outputs=True)['round_output'][number_of_rounds - 1], axis=1)
x = np.hstack([C0, C1])
print("Generating differential dataset for input difference ", input_differences)
return x, y


Expand All @@ -294,7 +294,7 @@ def get_neural_network(network_name, input_size, word_size = None, depth = 1):
if network_name == 'gohr_resnet':
if word_size is None or word_size == 0:
print("Word size not specified for ", network_name, ", defaulting to ciphertext size...")
word_size = cipher.output_bit_size
word_size = input_size//2
neural_network = make_resnet(word_size = word_size, input_size = input_size, depth = depth)
elif network_name == 'dbitnet':
neural_network = make_dbitnet(input_size = input_size)
Expand All @@ -306,29 +306,36 @@ def make_checkpoint(datei):
res = ModelCheckpoint(datei, monitor='val_loss', save_best_only=True)
return res


def train_neural_distinguisher(cipher, data_generator, starting_round, neural_network, training_samples=10 ** 7,
testing_samples=10 ** 6, num_epochs=1):
testing_samples=10 ** 6, num_epochs=1, batch_size = 5000, save_prefix=None):
acc = 1
bs = 5000
x, y = data_generator(samples=training_samples, nr=starting_round)
x_eval, y_eval = data_generator(samples=testing_samples, nr=starting_round)
h = neural_network.fit(x, y, epochs=int(num_epochs), batch_size=bs, validation_data=(x_eval, y_eval))
print("In train, save prefix is ", save_prefix, flush=True)
if save_prefix is None:
h = neural_network.fit(x, y, epochs=int(num_epochs), batch_size=batch_size, validation_data=(x_eval, y_eval))
else:
filename = save_prefix + "_" + str(starting_round)
print("In train, filename is ", filename, flush=True)
check = make_checkpoint(filename+'.h5');
print("In train, check is ", check, flush=True)
h = neural_network.fit(x, y, epochs=int(num_epochs), batch_size=batch_size, validation_data=(x_eval, y_eval), callbacks = [check])
np.save(filename + '.npy', h.history['val_acc'])
acc = np.max(h.history["val_acc"])
print(f'Validation accuracy at {starting_round} rounds :{acc}')
return acc


def neural_staged_training(cipher, data_generator, starting_round, neural_network=None, training_samples=10 ** 7,
testing_samples=10 ** 6, num_epochs=1):
testing_samples=10 ** 6, num_epochs=1, save_prefix = None):
acc = 1
nr = starting_round
# threshold at 10 sigma
threshold = 0.5 + 10 * sqrt(testing_samples // 4) / testing_samples
accuracies = {}
while acc >= threshold and nr < cipher.number_of_rounds:
acc = train_neural_distinguisher(cipher, data_generator, nr, neural_network, training_samples, testing_samples,
num_epochs)
num_epochs, save_prefix = save_prefix)
accuracies[nr] = acc
nr += 1
return accuracies
Expand Down
27 changes: 26 additions & 1 deletion tests/unit/cipher_modules/component_analysis_tests_test.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,32 @@
from claasp.ciphers.block_ciphers.fancy_block_cipher import FancyBlockCipher
from claasp.cipher_modules.component_analysis_tests import generate_boolean_polynomial_ring_from_cipher
from claasp.components.fsr_component import FSR
from claasp.cipher_modules.component_analysis_tests import fsr_properties
from claasp.ciphers.stream_ciphers.bluetooth_stream_cipher_e0 import BluetoothStreamCipherE0
from claasp.ciphers.stream_ciphers.trivium_stream_cipher import TriviumStreamCipher


def test_generate_boolean_polynomial_ring_from_cipher():
fancy = FancyBlockCipher(number_of_rounds=3)
generate_boolean_polynomial_ring_from_cipher(fancy)
generate_boolean_polynomial_ring_from_cipher(fancy)


def test_fsr_properties():
fsr_component = FSR(0, 0, ["input"], [[0, 1, 2, 3]], 4, [[[4, [[1, [0]], [3, [1]], [2, [2]]]]], 4])
operation = [fsr_component, 1, ['fsr_0_0']]
dictionary = fsr_properties(operation)
assert dictionary['fsr_word_size'] == 4
assert dictionary['lfsr_connection_polynomials'] == ['x^4 + (z4 + 1)*x^3 + z4*x^2 + 1']

e0 = BluetoothStreamCipherE0(keystream_bit_len=2)
dictionary = e0.component_analysis_tests()
assert dictionary[8]["number_of_registers"] == 4
assert dictionary[8]["lfsr_connection_polynomials"][0] == 'x^25 + x^20 + x^12 + x^8 + 1'
assert dictionary[8]["lfsr_connection_polynomials"][1] == 'x^31 + x^24 + x^16 + x^12 + 1'
assert dictionary[8]["lfsr_connection_polynomials"][2] == 'x^33 + x^28 + x^24 + x^4 + 1'
assert dictionary[8]["lfsr_connection_polynomials"][3] == 'x^39 + x^36 + x^28 + x^4 + 1'
assert dictionary[8]['lfsr_polynomials_are_primitive'] == [True, True, True, True]

triv = TriviumStreamCipher(keystream_bit_len=1)
dictionary = triv.component_analysis_tests()
assert dictionary[0]["type_of_registers"] == ['non-linear', 'non-linear', 'non-linear']

0 comments on commit e27ca2b

Please sign in to comment.