Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

在iterate()中添加调度模块后,训练表现异常 #66

Open
Socrates2001 opened this issue Sep 30, 2024 · 3 comments
Open

在iterate()中添加调度模块后,训练表现异常 #66

Socrates2001 opened this issue Sep 30, 2024 · 3 comments

Comments

@Socrates2001
Copy link

Socrates2001 commented Sep 30, 2024

在FL-GO中,每一次全局迭代包含三个步骤,采样、训练与聚合。现在其中加入一个调度模块后,训练之后的测试精度会在第三轮之后,一直卡在11.35。代码上相应的改动如下(一些关键超参:Mnist_IID, 'num_clients': 100, fedavg, option={'num_rounds':20, "gpu": 0, 'proportion': 0.2, 'num_steps': 5, 'responsiveness': 'UNI-5-1000'}):

def iterate(self):
    """
    The standard iteration of each federated communication round that contains three
    necessary procedure in FL: client selection, communication and model aggregation.

    Returns:
        False if the global model is not updated in this iteration
    """
    # sample clients: MD sampling as default
    self.selected_clients = self.sample()
    # training
    models = self.communicate(self.selected_clients)['model']
    # scheduling
    scheduled_clients, scheduled_models = self.client_scheduling(models)
    # aggregate: pk = 1/K as default where K=len(selected_clients)
    self.model = self.aggregate(scheduled_models)

    return len(scheduled_models) > 0
 
def client_scheduling(self, models):
    """
    基于设定的调度率,从完成训练的客户端中再次选择部分客户端进行聚合。

    Args:
        models (list): 完成本地训练的客户端的模型列表

    Returns:
        scheduled_clients (list): 调度后的客户端列表
        scheduled_models (list): 调度后的客户端模型列表
    """
    # 调度率,决定使用多少客户端进行聚合
    print("selected_clients", self.selected_clients)
    scheduling_rate = self.option.get('client_scheduling_rate', 0.5)  # 调度率设为50%

    num_selected_clients = len(models)
    num_scheduled_clients = max(int(num_selected_clients * scheduling_rate), 1)  # 至少选1个客户端

    # 随机从训练完成的客户端中选择调度的客户端
    scheduled_indices = np.random.choice(range(num_selected_clients), num_scheduled_clients, replace=False)
    print("scheduled_indices", scheduled_indices)
    # 选择调度后的客户端和其对应的模型
    scheduled_clients = [self.selected_clients[i] for i in scheduled_indices]
    print("scheduled_clients", scheduled_clients)
    scheduled_models = [models[i] for i in scheduled_indices]

    return scheduled_clients, scheduled_models

微信图片_20240930110737
当调度率为1的时候,效果是正常的,一旦小于1之后,表现就很奇怪了,请问这样的原因是什么,是否还需要改一些底层的代码?

@WwZzz
Copy link
Owner

WwZzz commented Sep 30, 2024

aggregate函数使用了self.selected_clients来计算聚合权重,schedule后的models长度小于self.selected_clients,权重和不对,需要把selected_clients重置成调度的用户

@Socrates2001
Copy link
Author

aggregate函数使用了self.selected_clients来计算聚合权重,schedule后的models长度小于self.selected_clients,权重和不对,需要把selected_clients重置成调度的用户

这个问题昨天被考虑到了,aggregate()有几种聚合方式,默认用的uniform,其中聚合权重的计算取决于接收到的模型长度,即调度的客户端数量或者说调度的模型数量,好像并没有调用到self.selected_clients参数:
...
elif self.aggregation_option == 'uniform':
return fmodule._model_average(models)
...

def _model_average(ms = [], p = []):
r"""
Averaging a list of models to a new one

Args:
    ms (list): a list of models (i.e. each model's class is FModule(...))
    p (list): a list of real numbers that are the averaging weights

Returns:
    The new model that is the weighted averaging of models in ms
"""
if len(ms)==0: return None
if len(p)==0: p = [1.0 / len(ms) for _ in range(len(ms))]
op_with_graph = sum([w.ingraph for w in ms]) > 0
res = ms[0].__class__().to(ms[0].get_device())
if op_with_graph:
    mlks = [get_module_from_model(mi) for mi in ms]
    mlr = get_module_from_model(res)
    for n in range(len(mlr)):
        mpks = [mlk[n]._parameters for mlk in mlks]
        rd = _modeldict_weighted_average(mpks, p)
        for l in mlr[n]._parameters.keys():
            if mlr[n]._parameters[l] is None: continue
            mlr[n]._parameters[l] = rd[l]
    res.op_with_graph()
else:
    _modeldict_cp(res.state_dict(), _modeldict_weighted_average([mi.state_dict() for mi in ms], p))
return res

@WwZzz
Copy link
Owner

WwZzz commented Oct 1, 2024

aggregate函数使用了self.selected_clients来计算聚合权重,schedule后的models长度小于self.selected_clients,权重和不对,需要把selected_clients重置成调度的用户

这个问题昨天被考虑到了,aggregate()有几种聚合方式,默认用的uniform,其中聚合权重的计算取决于接收到的模型长度,即调度的客户端数量或者说调度的模型数量,好像并没有调用到self.selected_clients参数: ... elif self.aggregation_option == 'uniform': return fmodule._model_average(models) ...

def _model_average(ms = [], p = []): r""" Averaging a list of models to a new one

Args:
    ms (list): a list of models (i.e. each model's class is FModule(...))
    p (list): a list of real numbers that are the averaging weights

Returns:
    The new model that is the weighted averaging of models in ms
"""
if len(ms)==0: return None
if len(p)==0: p = [1.0 / len(ms) for _ in range(len(ms))]
op_with_graph = sum([w.ingraph for w in ms]) > 0
res = ms[0].__class__().to(ms[0].get_device())
if op_with_graph:
    mlks = [get_module_from_model(mi) for mi in ms]
    mlr = get_module_from_model(res)
    for n in range(len(mlr)):
        mpks = [mlk[n]._parameters for mlk in mlks]
        rd = _modeldict_weighted_average(mpks, p)
        for l in mlr[n]._parameters.keys():
            if mlr[n]._parameters[l] is None: continue
            mlr[n]._parameters[l] = rd[l]
    res.op_with_graph()
else:
    _modeldict_cp(res.state_dict(), _modeldict_weighted_average([mi.state_dict() for mi in ms], p))
return res

后来按照fedavg更新过的论文对默认采样和聚合方式进行了更新,默认是使用uniform采样和加权聚合,以和最新的fedavg论文版本保持一致。可以直接用fmodule._model_average(models)聚合模型或是使用参数'aggregate':'uniform'。知乎教程和注释部分的说明好像没有及时修改,我改一下。

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants