Skip to content

Commit

Permalink
add new tutorials
Browse files Browse the repository at this point in the history
  • Loading branch information
WwZzz committed Mar 28, 2023
1 parent 68bf5c7 commit 5feb2ca
Show file tree
Hide file tree
Showing 3 changed files with 301 additions and 1 deletion.
2 changes: 1 addition & 1 deletion flgo/utils/fflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,7 +616,7 @@ def multi_init_and_run(runner_args:list, devices = [], scheduler=None):
if aid==1:
algorithm = a[aid]
algorithm_name = algorithm.__name__ if (not hasattr(algorithm, '__module__') and hasattr(algorithm, '__name__')) else algorithm
default_args[aid] = a[aid]
default_args[aid] = algorithm_name
elif aid==2:
option = a[aid]
default_option = read_option_from_command()
Expand Down
130 changes: 130 additions & 0 deletions tutorial/1.5_Configuration - Logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
# 如何用Logger做实验?

这一节主要介绍FLGo中的日志记录模块。

## 1 使用Logger进行实验记录

FLGo中的日志记录器Logger用于观察训练过程中的实时结果,并记录到字典(logger.output)中,存在相应任务目录的record文件夹中。 Logger主要通过提供以下3个接口函数,来帮助用户达到实验目的。

* `initialize`:预留的初始化方法;

* `log_once`:每隔一定通信轮次,被调用一次,来记录\输出实时结果到变量`self.output`(类型为`collections.defaultdict(list)`)中;

* `organize_output`:在将`self.output`保存成.json文件之前,对特定内容进行组织,使得相应内容可以被直接用于绘制等目的;


由于这3个接口函数中,大多数时候都只用得到`log_once`函数,这里也仅对该函数的用法进行适当介绍。FLGo中所有的`Logger`都需要继承自`flgo.experiment.logger.BasicLogger`。当不指定`Logger`时,默认使用的`Logger``flgo.experiment.logger.SimpleLogger`,其代码实现如下:


```python
from flgo.experiment.logger import BasicLogger
import numpy as np
import flgo.system_simulator.base as ss

class SimpleLogger(BasicLogger):
def initialize(self):
"""在输出output中记录各用户的本地数据量,用户使用self.participants属性访问,服务器使用self.coordinator属性访问。self.output的默认键值为空列表"""
for c in self.participants:
self.output['client_datavol'].append(len(c.train_data))

def log_once(self, *args, **kwargs):
# 服务器(coordinator)使用test方法测试全局模型的测试集性能,并记录至output中
test_metric = self.coordinator.test()
for met_name, met_val in test_metric.items():
self.output['test_' + met_name].append(met_val)
# 服务器(coordinator)使用global_test方法测试全局模型的用户本地验证集性能分布,并记录至output中
valid_metrics = self.coordinator.global_test('valid')
local_data_vols = [c.datavol for c in self.participants]
total_data_vol = sum(local_data_vols)
for met_name, met_val in valid_metrics.items():
self.output['valid_'+met_name+'_dist'].append(met_val)
self.output['valid_' + met_name].append(1.0 * sum([client_vol * client_met for client_vol, client_met in zip(local_data_vols, met_val)]) / total_data_vol)
self.output['mean_valid_' + met_name].append(np.mean(met_val))
self.output['std_valid_' + met_name].append(np.std(met_val))
# 将当前output中信息输出至控制台
self.show_current_output()
```

默认的`SimpleLogger`每一轮测试模型的测试集性能和验证集性能,并记录。最后`output`字典在`runner`训练结束之后会被存成.json文件。

### 定制自己的Logger

这里通过下面的例子演示如何定制自己的`Logger`。这个`Logger`每一轮仅测试模型的测试集性能,并记录测试集指标最佳的全局模型为最优全局模型。最后用最优全局模型来测试本地用户数据的验证集性能分布,记录验证集性能分布中最佳30%用户和最差30%用户的平均性能。为了实现这些功能,实现`Logger`如下:


```python
from flgo.experiment.logger import BasicLogger
import collections
import numpy as np
import copy

class MyLogger(BasicLogger):
def initialize(self, *args, **kwargs):
self.optimal_model = copy.deepcopy(self.coordinator.model)
self.optimal_test_loss = 9999

def log_once(self):
# 测模型测试集指标
test_metric = self.coordinator.test()
for met_name, met_val in test_metric.items():
self.output['test_' + met_name].append(met_val)
# 检测当前模型是否为最优模型
if test_metric['loss']<self.optimal_test_loss:
self.optimal_test_loss = test_metric['loss']
self.optimal_model.load_state_dict(self.coordinator.model.state_dict())
self.show_current_output()

def organize_output(self):
super().organize_output()
# 测所有用户验证集指标
all_metrics = collections.defaultdict(list)
for c in self.participants:
client_metrics = c.test(self.optimal_model, 'valid')
for met_name, met_val in client_metrics.items():
all_metrics[met_name].append(met_val)
for met_name, metval in all_metrics.items():
self.output[met_name] = metval
# 计算最佳\最差30%用户验证集指标
met_name = 'loss'
all_valid_losses = sorted(all_metrics[met_name])
k1 = int(0.3*len(self.participants))
k2 = int(0.7*len(self.participants))
self.output['worst_30_valid_loss'] = 1.0*sum(all_valid_losses[k2:])/k1
self.output['best_30_valid_loss'] = 1.0*sum(all_valid_losses[:k1])/k1
```

下面验证所实现的`MyLogger`的效果:


```python
import flgo
import flgo.algorithm.fedavg as fedavg
import flgo.algorithm.qfedavg as qfedavg
import os

task = './test_synthetic'
config = {'benchmark':{'name':'flgo.benchmark.synthetic_regression', 'para':{'alpha':0.5, 'beta':0.5, 'num_clients':30}}}
if not os.path.exists(task): flgo.gen_task(config, task_path = task)

op = {'num_rounds':30, 'num_epochs':1, 'batch_size':8, 'learning_rate':0.1, 'proportion':1.0 ,'gpu':0, 'algo_para':0.1}
fedavg_runner = flgo.init(task, fedavg, option = op, Logger=MyLogger)
qffl_runner = flgo.init(task, qfedavg, option=op, Logger=MyLogger)
fedavg_runner.run()
qffl_runner.run()
```


```python
import flgo.experiment.analyzer as al
records = al.Selector({'task':task, 'header':['fedavg', 'qfedavg_q0.1',], 'filter':{'R':30, 'E':1, 'B':8, 'LR':0.1,'P':1.0}}).records[task]
for rec in records:
wl = rec.data['worst_30_valid_loss']
bl = rec.data['best_30_valid_loss']
print('{}:(Worst is {}, Best is {})'.format(rec.data['option']['algorithm'], wl, bl))
```

fedavg:(Worst is 1.539149112171597, Best is 0.1532415940115849)
qfedavg:(Worst is 1.5319806469811335, Best is 0.4070415910747316)


可以看到记录中相应关键字的内容已被保存,且可以被读取。其中qfedavg的最差30%用户性能略好于fedavg,但最佳30%用户性能若弱于fedavg。
170 changes: 170 additions & 0 deletions tutorial/1.6 Configuration - Runner-level Parallel & Tuning.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# 如何使用FLGo并行运行联邦算法?

在前几节中,不同的运行器(i.e. runner)都是串行运行的,即只能一个runner结束之后,才能开启下一个的训练。且如果并行运行不同runner的话,不止要面临同时写好几个文件的问题,还需要手动观察GPU的状况,防止显存溢出导致程序终止,造成时间精力的浪费。

为了克服这些做实验的问题,这里把并行运行不同runner的功能封装到了`flgo.multi_init_and_run`函数中,由该函数可以同时运行多个runner,且自动管理GPU来保证成功运行所有的runner。该函数定义如下:


```python
def multi_init_and_run(runner_args:list, devices = [], scheduler=None):
r"""
Create multiple runners and run in parallel
param:
runner_args (list): each element in runner_args should be either a dict or a tuple or parameters
devices (list): a list of gpu id
scheduler (class flgo.experiment.device_scheduler.BasicScheduler): GPU scheduler
return:
a list of output results of runners
Example:
>>> from flgo.algorithm import fedavg, fedprox, scaffold
>>> # create task 'mnist_iid' by flgo.gen_task if there exists no such task
>>> task='./mnist_iid'
>>> if os.path.exists(task): flgo.gen_task({'benchmark':{'name':'flgo.benchmark.mnist_classification'}, 'partitioner':{'name':'IIDPartitioner','para':{'num_clients':100}}}, task)
>>> algos = [fedavg, fedprox, scaffold]
>>> flgo.multi_init_and_run([{'task':task, 'algorithm':algo} for algo in algos], devices=[0])
"""
...
```

`flgo.multi_init_and_run`函数返回的结果为:所有传入的runner运行结束之后,由各自持有的logger所记录的输出字典。

`flgo.multi_init_and_run`
* **runner_args**:列表类型,运行器配置(字典)的列表
* **devices(可选)**:列表类型,用于调度的gpu编号列表,例如\[0,1\]将指定0号卡和1号卡
* **scheduler(可选)**:类型为flgo.experiment.device_scheduler.BasicScheduler。用于指定GPU的调度器来自动调度任务。

`devices`当且仅当`scheduler`未指定时生效,此时`scheduler`将被自动指定为`flgo.experiment.device_scheduler.BasicScheduler`,不具备GPU自动管理的能力,所有runner会被一股脑放到各个gpu上,容易造成显存溢出。

为了让不同的runner可以安全地运行到结束,这里把`scheduler`留做一个可以自定义的接口,让用户可以自行定义如何调度GPU,并且提供了一个默认的gpu自动调度器的实现:`flgo.experiment.device_scheduler.AutoScheduler``AutoScheduler`调度器每隔一定周期检查gpu是否可用,若可用的话则把runner放到卡上去跑。其中这里判定gpu可用的详细定义为:若某张gpu的空闲显存大于所有已运行的runner的平均(或最大)的显存占用超过一定时间长度,则该gpu可用。

下面演示如何使用`flgo.multi_init_and_run`并行运行runner

## Example 1. 并行运行不同参数


```python
import flgo
import flgo.algorithm.fedavg as fedavg
import flgo.experiment.device_scheduler as ds
```


```python
# 创建联邦任务
task = './test_cifar10'
# 为每个runner参数指定:任务、算法、参数,该字典的内容应与flgo.init的参数一致,未填写的项将使用默认值。
runner_dict = [{'task':task, 'algorithm':fedavg, 'option':{'lr':0.01*i, 'no_log_console':True, 'num_rounds':5}} for i in range(1,6)]
# 指定调度器为flgo.experiment.device_scheduler.AutoScheduler,并指定可用的卡为0号卡
asc = ds.AutoScheduler([0])
# 并行运行命令,可以看到子进程报错退出后,并不会导致主进程也退出,相应的runner将会在合适的时刻被再次运行,直至所有runner成功运行结束。
flgo.multi_init_and_run(runner_dict, scheduler=asc)
```

## Example 2. 并行运行不同算法


```python
# 创建联邦任务
from flgo.algorithm import fedavg, fedprox
task = './test_cifar10'
algos = [fedavg, fedprox]
runner_dict = [{'task':task, 'algorithm':a, 'option':{'lr':0.01, 'no_log_console':True, 'num_steps':1,'num_rounds':2}} for a in algos]
# 指定调度器为flgo.experiment.device_scheduler.AutoScheduler,并指定可用的卡为0号卡
asc = ds.AutoScheduler([0])
res = flgo.multi_init_and_run(runner_dict, scheduler=asc)
```


```python
import matplotlib.pyplot as plt
for r in res:
plt.plot(list(range(len(r['test_loss']))), r['test_loss'], label = r['option']['algorithm'])
plt.legend()
plt.show()
```

# 如何使用FLGo自动网格搜索调参?

网格搜索通过`flgo.tune`来实现,它跟上述`flgo.multi_init_and_run`的并行机制是一致的,通过指定`scheduler`来自动并行运行命令。两者的区别在于传入的参数不同,其中`flgo.tune`的输入`flgo.init`具有相同的形式如下,和`flgo.init`的参数区别在于`option`


```python
# option定义示例如下,其中每个关键字被指定为列表,而不是数值(单个数值将被视为长度为1的列表)。调参的范围为所有列表的值排列组合构成的网格
option = {
'learning_rate':[0.1, 0.01, 0.05],
'batch_size':[16,32],
'num_epochs':[1,5],
}
# tune函数声明:“
def tune(task: str, algorithm, option: dict = {}, model=None, Logger: flgo.experiment.logger.BasicLogger = flgo.experiment.logger.tune_logger.TuneLogger, Simulator: BasicSimulator=flgo.system_simulator.DefaultSimulator, scene='horizontal', scheduler=None):
"""
Tune hyper-parameters for one task and one algorithm in parallel.
:param
task (str): the dictionary of the federated task
algorithm (module || class): the algorithm will be used to optimize the model in federated manner, which must contain pre-defined attributions (e.g. algorithm.Server and algorithm.Client for horizontal federated learning)
option (dict): the dict whose values should be of type list to construct the combinations
model (module || class): the model module that contains two methods: model.init_local_module(object) and model.init_global_module(object)
Logger (class): the class of the logger inherited from flgo.experiment.logger.BasicLogger
Simulator (class): the class of the simulator inherited from flgo.system_simulator.BasicSimulator
scene (str): 'horizontal' or 'vertical' in current version of FLGo
scheduler (instance of flgo.experiment.device_scheduler.BasicScheduler): GPU scheduler that schedules GPU by checking their availability
"""
...
```

此外,传入`flgo.tune`函数的logger必须在`logger.log_once`方法中,记录关键字`valid_loss``logger.output`中,调参在验证集上进行。因此默认调用的logger为`flgo.experiment.logger.tune_logger.TuneLogger`。下面演示如何使用`flgo.tune`来自动调参。


## Example 3. 自动调参


```python
option = {
'learning_rate':[0.1, 0.01, 0.05],
'batch_size':[16],
'num_steps':[1,5],
'num_rounds': 5,
'no_log_console': True
}
flgo.tune(task, fedavg, option, scheduler = asc)
# 在该简单网格上,最后调参输出结果如下
```

Process 14314 was created for args ('./test_cifar10', 'flgo.algorithm.fedavg', {'learning_rate': 0.1, 'batch_size': 16, 'num_steps': 1, 'num_rounds': 5, 'no_log_console': True, 'log_file': True, 'gpu': 0}, None, <class 'flgo.experiment.logger.tune_logger.TuneLogger'>, <class 'flgo.system_simulator.default_simulator.Simulator'>, 'horizontal')
Process 14523 was created for args ('./test_cifar10', 'flgo.algorithm.fedavg', {'learning_rate': 0.1, 'batch_size': 16, 'num_steps': 5, 'num_rounds': 5, 'no_log_console': True, 'log_file': True, 'gpu': 0}, None, <class 'flgo.experiment.logger.tune_logger.TuneLogger'>, <class 'flgo.system_simulator.default_simulator.Simulator'>, 'horizontal')
Process 14621 was created for args ('./test_cifar10', 'flgo.algorithm.fedavg', {'learning_rate': 0.01, 'batch_size': 16, 'num_steps': 1, 'num_rounds': 5, 'no_log_console': True, 'log_file': True, 'gpu': 0}, None, <class 'flgo.experiment.logger.tune_logger.TuneLogger'>, <class 'flgo.system_simulator.default_simulator.Simulator'>, 'horizontal')
Process 14746 was created for args ('./test_cifar10', 'flgo.algorithm.fedavg', {'learning_rate': 0.01, 'batch_size': 16, 'num_steps': 5, 'num_rounds': 5, 'no_log_console': True, 'log_file': True, 'gpu': 0}, None, <class 'flgo.experiment.logger.tune_logger.TuneLogger'>, <class 'flgo.system_simulator.default_simulator.Simulator'>, 'horizontal')
Process 14826 was created for args ('./test_cifar10', 'flgo.algorithm.fedavg', {'learning_rate': 0.05, 'batch_size': 16, 'num_steps': 1, 'num_rounds': 5, 'no_log_console': True, 'log_file': True, 'gpu': 0}, None, <class 'flgo.experiment.logger.tune_logger.TuneLogger'>, <class 'flgo.system_simulator.default_simulator.Simulator'>, 'horizontal')
Process 14938 was created for args ('./test_cifar10', 'flgo.algorithm.fedavg', {'learning_rate': 0.05, 'batch_size': 16, 'num_steps': 5, 'num_rounds': 5, 'no_log_console': True, 'log_file': True, 'gpu': 0}, None, <class 'flgo.experiment.logger.tune_logger.TuneLogger'>, <class 'flgo.system_simulator.default_simulator.Simulator'>, 'horizontal')
The optimal combination of hyper-parameters is:
-----------------------------------------------
learning_rate |0.05
batch_size |16
num_steps |5
num_rounds |5
no_log_console |True
log_file |True
-----------------------------------------------
The minimal validation loss occurs at the round 5


可以看到最优参数为步长为0.05,批大小为16,本地更新步数为5,在第5个通信轮次达到最低验证集损失。

## 如何使用早停机制?

相较于传统机器学习中,判定超过多少个epoch验证集损失未改善便停止训练,在联邦中这里使用的是超过多少个通信轮次验证集损失未改善便停止训练。为达到此目的,仅需要在option中额外指定早停的连续未改进轮数如下:


```python
option = {
'learning_rate':[0.1, 0.01],
'batch_size':[16],
'num_steps':[1],
'num_rounds': 1000,
'no_log_console': True
'early_stop': 10
}
flgo.tune(task, fedavg, option, scheduler = asc)
```

在实践中,联邦由于noniid等因素的影响,训练波动很大,连续数十个通信轮数验证集损失没改进也不代表过拟合了。因此early_stop可以设的很大很大,例如500。

0 comments on commit 5feb2ca

Please sign in to comment.