-
Notifications
You must be signed in to change notification settings - Fork 0
/
HFL.py
524 lines (459 loc) · 21.4 KB
/
HFL.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
import argparse
import os
import math
import numpy as np
import random
import time
import tomli
from datetime import datetime
import tensorflow as tf
from tensorflow.keras import layers, models, losses
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import SGD
from tensorflow.keras import backend as K
from Training_time_attacks import attack_labelflipping, attack_backdoor
from Inference_time_attacks import attacks
def parse_arguments():
"""
Parse command line arguments for the experiment configurations.
Returns:
argparse.Namespace: Parsed arguments.
"""
parser = argparse.ArgumentParser()
parser.add_argument('-config', '--configurations', help='experiment configurations')
return parser.parse_args()
def load_configurations(config_path):
"""
Load experiment configurations from a TOML file.
Args:
config_path (str): Path to the TOML configuration file.
Returns:
dict: Experiment configurations.
"""
with open(config_path, mode="rb") as fp:
return tomli.load(fp)
def create_experiment_folder(exp_id, exp_description, work_space):
"""
Create the main folder for storing experiment results.
Args:
exp_id (str): Experiment ID.
exp_description (str): Description of the experiment.
work_space (str): Base directory for the experiment.
Returns:
str: Path to the created experiment folder.
"""
exp_main_folder_path = work_space
training_time = datetime.now().strftime("%d-%m-%y_%I-%M-%S-%p")
exp_folder = os.path.join(exp_main_folder_path, f'Exp_{exp_id}_{training_time}_{exp_description}')
os.mkdir(exp_folder)
return exp_folder
def create_model(dataset_name, image_shape):
"""
Create a Convolutional Neural Network model based on the dataset.
Args:
dataset_name (str): Name of the dataset (e.g., 'mnist', 'cifar10').
image_shape (tuple): Shape of the input images.
Returns:
tf.keras.models.Sequential: Compiled CNN model.
"""
model = Sequential()
if dataset_name in ['mnist', 'fashion-mnist']:
model.add(layers.Conv2D(32, (3, 3), padding="same", input_shape=image_shape))
model.add(layers.Activation("relu"))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Conv2D(64, (3, 3), padding='same'))
model.add(layers.Activation("relu"))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Flatten())
model.add(layers.Dense(512))
model.add(layers.Activation("relu"))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10))
model.add(layers.Activation("softmax"))
elif dataset_name in ['cifar10', 'stl10']:
model.add(layers.Conv2D(32, (3, 3), padding="same", input_shape=image_shape))
model.add(layers.Activation("relu"))
model.add(layers.Conv2D(32, (3, 3)))
model.add(layers.Activation("relu"))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Conv2D(64, (3, 3), padding="same"))
model.add(layers.Activation("relu"))
model.add(layers.Conv2D(64, (3, 3)))
model.add(layers.Activation("relu"))
model.add(layers.MaxPooling2D(pool_size=(2, 2)))
model.add(layers.Dropout(0.25))
model.add(layers.Flatten())
model.add(layers.Dense(512))
model.add(layers.Activation("relu"))
model.add(layers.Dropout(0.5))
model.add(layers.Dense(10))
model.add(layers.Activation("softmax"))
lr_schedule = tf.keras.optimizers.schedules.ExponentialDecay(
initial_learning_rate=3e-4,
decay_steps=32,
decay_rate=0.1
)
model.compile(
optimizer='adam',
loss="categorical_crossentropy",
metrics=['accuracy', tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
)
return model
class Node:
"""
Base class for a Node in the federated learning topology.
"""
def __init__(self, id, name, node_type, node_level, saved_data_path, image_shape, n_classes):
self.id = id
self.name = name
self.node_type = node_type
self.node_level = node_level
self.memory_path = f'level{self.node_level}/{self.name}/memory'
self.data_path = f'{saved_data_path}/{self.name}/clean'
self.global_model = ''
self.local_model = ''
self.local_model_num = 0
self.training_epoch = 10
self.training_batch_size = 32
self.image_shape = image_shape
self.n_classes = n_classes
def set_training_parameters(self, training_epoch, training_batch_size):
"""
Set the training parameters for the node.
Args:
training_epoch (int): Number of training epochs.
training_batch_size (int): Size of the training batch.
"""
self.training_epoch = training_epoch
self.training_batch_size = training_batch_size
def print_info(self):
"""
Print the information of the node.
"""
print(f'Node ID: {self.id}, Name: {self.name}, Type: {self.node_type}, Level: {self.node_level}')
print(f'Memory Path: {self.memory_path}, Global Model: {self.global_model}, Local Model: {self.local_model}')
print(f'Training Epoch: {self.training_epoch}, Training Batch Size: {self.training_batch_size}')
def get_clean_data(self):
"""
Load and return the clean training data.
Returns:
tuple: Training images, labels, and the number of samples.
"""
train_images = np.load(os.path.join(self.data_path, 'x_train.npy'))
train_labels = np.load(os.path.join(self.data_path, 'y_train.npy'))
return train_images, train_labels, len(train_images)
def upload_model(self):
"""
Upload the local model.
Returns:
str: Path to the local model.
"""
return self.local_model
def download_model(self, model):
"""
Download and set the global model.
Args:
model (str): Path to the global model.
"""
self.global_model = model
def local_training(self, initial_training=True):
"""
Perform local training on the node.
Args:
initial_training (bool): Flag to indicate initial training or continued training.
Returns:
tuple: Path to the trained local model and the number of samples.
"""
model = create_model(self.image_shape, self.n_classes) if initial_training else tf.keras.models.load_model(self.global_model)
train_images, train_labels, num_samples = self.get_clean_data()
self.print_info()
model.fit(x=train_images, y=train_labels, epochs=self.training_epoch, batch_size=self.training_batch_size)
model_path = os.path.join(self.memory_path, 'model', 'local_model.h5')
model.save(model_path)
self.local_model = model_path
return self.local_model, num_samples
class Client(Node):
"""
Client node in the federated learning topology.
"""
def __init__(self, id, name, node_type, node_level, saved_data_path, image_shape, n_classes):
super().__init__(id, name, node_type, node_level, saved_data_path, image_shape, n_classes)
self.DPA_enable = False
self.MPA_enable = False
self.train_images = []
self.train_labels = []
self.num_samples = 0
def set_poisoning_parameters(self, DPA_enable, MPA_enable):
"""
Set the poisoning parameters for the client.
Args:
DPA_enable (bool): Flag to enable/disable data poisoning attacks.
MPA_enable (bool): Flag to enable/disable model poisoning attacks.
"""
self.DPA_enable = DPA_enable
self.MPA_enable = MPA_enable
def get_poisoned_data(self, train_images, train_labels, attack_name, percent_poison, target_labels, source_labels, continual_training_path):
"""
Generate and return poisoned training data.
Args:
train_images (np.ndarray): Clean training images.
train_labels (np.ndarray): Clean training labels.
attack_name (str): Name of the attack.
percent_poison (float): Percentage of data to poison.
target_labels (np.ndarray): Target labels for the attack.
source_labels (np.ndarray): Source labels for the attack.
continual_training_path (str): Path to the model for adversarial training.
Returns:
tuple: Poisoned training images, labels, and the number of samples.
"""
if attack_name == 'backdoor':
_, poison_train_images, poison_train_labels = attack_backdoor.generate(train_images, train_labels, target_labels, source_labels, self.n_classes, percent_poison)
elif attack_name == 'label_flipping':
poison_train_images, poison_train_labels = attack_labelflipping.generate(train_images, train_labels)
elif adversarial_training_enable:
model = tf.keras.models.load_model(continual_training_path)
poison_train_images, poison_train_labels = attacks.generate_poisoned_data(train_images, train_labels, model, attack_name)
poison_train_images = np.concatenate((poison_train_images, train_images), axis=0)
poison_train_labels = np.concatenate((poison_train_labels, train_labels), axis=0)
return poison_train_images, poison_train_labels, len(poison_train_images)
def local_training(self, initial_training=True, distillation_enable=False, global_round=1):
"""
Perform local training on the client node, with optional distillation.
Args:
initial_training (bool): Flag to indicate initial training or continued training.
distillation_enable (bool): Flag to enable/disable distillation.
global_round (int): The current global round number.
Returns:
tuple: Path to the trained local model and the number of samples.
"""
model = create_model(self.image_shape, self.n_classes) if initial_training else tf.keras.models.load_model(self.global_model)
self.print_info()
if distillation_enable:
teacher_model = tf.keras.models.load_model(teacher_model_path)
preds = teacher_model.predict(x=self.train_images, batch_size=self.training_batch_size)
self.train_labels = preds
model.fit(x=self.train_images, y=self.train_labels, epochs=self.training_epoch, batch_size=self.training_batch_size)
if self.MPA_enable:
model = self.get_poisoned_model(model)
model_path = os.path.join(self.memory_path, 'model', f'local_model_{global_round}.h5')
model.save(model_path)
self.local_model = model_path
return self.local_model, self.num_samples
class Server(Node):
"""
Server node in the federated learning topology.
"""
def __init__(self, id, name, node_type, node_level, saved_data_path, image_shape, n_classes):
super().__init__(id, name, node_type, node_level, saved_data_path, image_shape, n_classes)
self.local_model_num = 1
self.child_nodes = []
self.aggregation_round = 1
self.child_nodes_per_round = len(self.child_nodes)
self.global_model_num = 1
self.MPA_enable = False
def set_child_nodes(self, child_nodes):
"""
Set the child nodes for the server.
Args:
child_nodes (list): List of child nodes.
"""
self.child_nodes = child_nodes
self.child_nodes_per_round = len(child_nodes)
def aggregation(self, participants, global_round):
"""
Aggregate the models from the participants.
Args:
participants (list): List of participant nodes.
global_round (int): The current global round number.
Returns:
tuple: Path to the aggregated global model and the total number of samples.
"""
w_locals = []
model_path = self.global_model
model_last = tf.keras.models.load_model(model_path)
params_last = model_last.get_weights()
array1 = np.array(params_last)
sum_num_samples = 0
for p in participants:
p.download_model(self.global_model)
if self.node_level == len(nodes) - 2:
model_path, num_samples = p.local_training(initial_training=False, global_round=global_round)
else:
model_path, num_samples = p.start_aggregation(initial_training=False, continual_training=False, global_round=global_round)
model_update = tf.keras.models.load_model(model_path)
params = model_update.get_weights()
array2 = np.array(params)
array1 += (num_samples * array2)
sum_num_samples += num_samples
K.clear_session()
array1 /= sum_num_samples
model_update = tf.keras.models.load_model(model_path)
model_update.set_weights(array1)
if self.MPA_enable:
model_update = self.get_poisoned_model(model_update)
updated_model_path = os.path.join(self.memory_path, 'model', f'global_model_round_{global_round}.h5')
model_update.save(updated_model_path)
self.global_model = updated_model_path
return self.global_model, sum_num_samples
def start_aggregation(self, initial_training=True, continual_training=False, global_round=1):
"""
Start the aggregation process for the server.
Args:
initial_training (bool): Flag to indicate initial training or continued training.
continual_training (bool): Flag to indicate continual training.
global_round (int): The current global round number.
Returns:
tuple: Path to the aggregated global model and the total number of samples.
"""
start_time = datetime.now()
if initial_training:
local_model, num_samples = self.local_training(initial_training)
self.global_model = local_model
if continual_training:
local_model = continual_training_path
self.global_model = local_model
initial_training = False
continual_training = False
nonparticipant = self.child_nodes.copy()
while global_round <= self.aggregation_round:
participants = random.sample(nonparticipant, self.child_nodes_per_round)
model, num_samples = self.aggregation(participants, global_round)
global_round += 1
model = tf.keras.models.load_model(self.global_model)
model_path = os.path.join(self.memory_path, 'model', f'local_model_round_{self.local_model_num}.h5')
model.save(model_path)
self.local_model = model_path
return self.local_model, num_samples
def main():
"""
Main function to set up and run the federated learning experiment.
"""
prompt_args = parse_arguments()
args = load_configurations(prompt_args.configurations)
exp_id = args["experimnet_id"]
saved_data_path = args["data_path"]
work_space = args["store_folder"]
dataset_name = args["dataset_name"]
n_classes, image_shape = (10, (32, 32, 3)) if dataset_name == 'cifar10' else (10, (28, 28, 1))
num_clients = args["topology_description"]["number_of_clients"]
topology_levels = args["topology_description"]["topology_levels"]
topology = args["topology"]
topology_name = args["topology_description"]["topology_name"]
epochs = args["local_training"]["epochs"]
batch_size = args["local_training"]["batch_size"]
aggregation_round = args["federated_training"]["aggregation_round"]
percent_of_participats = args["federated_training"]["percent_of_participats"]
start_training = args["federated_training"]["start_training"]
if start_training == 'start':
initial_training = True
continual_training = False
start_from = 1
else:
initial_training = False
continual_training = True
continual_training_path = args["federated_training"]["resume_from_model"]
start_from = args["federated_training"]["resume_from_round_number"]
training_time = args["federated_training"]["resume_from_training_time"]
defence_name = 'none'
distillation_enable = args["distillation_training"]["distillation_enable"]
if distillation_enable:
teacher_model_path = args["distillation_training"]["teacher_model"]
defence_name = 'distillation'
adversarial_training_enable = args["adversarial_training"]["adversarial_training_enable"]
if adversarial_training_enable:
continual_training_path = args["federated_training"]["resume_from_model"]
defence_name = 'adversarial_training'
attack_enable = args["attack"]["attack_enable"]
if attack_enable or adversarial_training_enable:
DPA_list = ['backdoor', 'label_flipping']
MPA_list = ['signflip']
AT_list = ['fgm', 'Spatial_Transformation']
attack_name = args["attack"]["attack_name"]
adv_nodes = args["attack"]["malicious_nodes"]
num_malicious_nodes = len(adv_nodes)
if attack_name in DPA_list or attack_name in AT_list:
DPA_enable = True
MPA_enable = False
percent_poison = args["attack"]["percent_of_poison_data"]
source_labels = np.arange(n_classes)
target_labels = (np.arange(n_classes) + 1) % n_classes
elif attack_name in MPA_list:
MPA_enable = True
DPA_enable = False
else:
attack_name = 'none'
num_malicious_nodes = 0
adv_nodes = []
exp_description = f'{topology_name}_{topology_levels}L_{dataset_name}_{num_clients}_attack_{attack_name}{num_malicious_nodes}_defence_{defence_name}'
exp_folder = create_experiment_folder(exp_id, exp_description, work_space)
os.chdir(exp_folder)
print(f'Experiment Description: {exp_description}')
print(f'Experiment Folder: {exp_folder}')
nodes = [[] for _ in range(topology_levels)]
def create_nodes(t, level, id):
"""
Create nodes (servers and clients) for the federated learning topology.
Args:
t (dict): Topology dictionary.
level (int): Current level in the topology.
id (int): Current node ID.
"""
for parent, childs in t.items():
if isinstance(childs, dict):
nodes[level].append(Server(id=id, name=parent, node_type='server', node_level=level, saved_data_path=saved_data_path, image_shape=image_shape, n_classes=n_classes))
id += 1
create_nodes(childs, level + 1, id)
else:
nodes[level].append(Server(id=id, name=parent, node_type='server', node_level=level, saved_data_path=saved_data_path, image_shape=image_shape, n_classes=n_classes))
id += 1
for child in childs:
nodes[level + 1].append(Client(id=id, name=child, node_type='client', node_level=level + 1, saved_data_path=saved_data_path, image_shape=image_shape, n_classes=n_classes))
id += 1
create_nodes(topology, 0, 0)
for level in range(topology_levels):
for node in nodes[level]:
node.set_training_parameters(epochs, batch_size)
if level != topology_levels - 1:
node.aggregation_round = aggregation_round[level]
if level == topology_levels - 2:
for adv_node in adv_nodes:
if node.name == adv_node and MPA_enable:
node.MPA_enable = MPA_enable
if level == topology_levels - 1:
train_images, train_labels, num_samples = node.get_clean_data()
for adv_node in adv_nodes:
if node.name == adv_node:
if DPA_enable:
node.DPA_enable = DPA_enable
train_images, train_labels, num_samples = node.get_poisoned_data(train_images, train_labels, attack_name, percent_poison, target_labels, source_labels, continual_training_path)
elif MPA_enable:
node.MPA_enable = MPA_enable
node.train_images = train_images
node.train_labels = train_labels
node.num_samples = num_samples
def set_child_nodes(t, level):
"""
Set the child nodes for each server node in the topology.
Args:
t (dict): Topology dictionary.
level (int): Current level in the topology.
"""
for parent, childs in t.items():
for node in nodes[level]:
if node.name == parent:
parent_inx = nodes[level].index(node)
for child in childs:
for node in nodes[level + 1]:
if node.name == child:
child_inx = nodes[level + 1].index(node)
nodes[level][parent_inx].set_child_nodes([nodes[level + 1][child_inx]])
if isinstance(childs, dict):
set_child_nodes(childs, level + 1)
set_child_nodes(topology, 0)
start_time = time.time()
nodes[0][0].start_aggregation(initial_training=initial_training, continual_training=continual_training, global_round=1)
end_time = time.time()
print(f"Elapsed time: {end_time - start_time}")
if __name__ == "__main__":
main()