原文:https://pytorch.org/tutorials/intermediate/rpc_tutorial.html
作者:Shen Li
先决条件:
本教程使用两个简单的示例来演示如何使用torch.distributed.rpc
包构建分布式训练,该包首先在 PyTorch v1.4 中作为原型功能引入。 这两个示例的源代码可以在 PyTorch 示例中找到。
先前的教程分布式数据并行入门和使用 PyTorch 编写分布式应用,描述了DistributedDataParallel
,该模型支持特定的训练范例,该模型可在多个进程之间复制,每个进程都处理输入数据的拆分。 有时,您可能会遇到需要不同训练范例的场景。 例如:
- 在强化学习中,从环境中获取训练数据可能相对昂贵,而模型本身可能很小。 在这种情况下,产生多个并行运行的观察者并共享一个智能体可能会很有用。 在这种情况下,智能体将在本地负责训练,但是应用仍将需要库在观察者和训练者之间发送和接收数据。
- 您的模型可能太大,无法容纳在一台计算机上的 GPU 中,因此需要一个库来帮助将模型拆分到多台计算机上。 或者,您可能正在实现参数服务器训练框架,其中模型参数和训练器位于不同的机器上。
torch.distributed.rpc
包可以帮助解决上述情况。 在情况 1 中, RPC 和 RRef 允许将数据从一个工作程序发送到另一个工作程序,同时轻松引用远程数据对象。 在情况 2 中,分布式 Autograd 和分布式优化器使执行反向传递和优化器步骤就像本地训练一样。 在接下来的两节中,我们将使用强化学习示例和语言模型示例来演示torch.distributed.rpc
的 API。 请注意,本教程并非旨在构建最准确或最有效的模型来解决给定的问题,相反,此处的主要目标是演示如何使用torch.distributed.rpc
包来构建分布式训练应用。
本节介绍了使用 RPC 建立玩具分布式强化学习模型以解决 OpenAI Gym 中的 CartPole-v1 的步骤。 策略代码主要是从现有的单线程示例中借用的,如下所示。 我们将跳过Policy
设计的详细信息,并将重点介绍 RPC 的用法。
import torch.nn as nn
import torch.nn.functional as F
class Policy(nn.Module):
def __init__(self):
super(Policy, self).__init__()
self.affine1 = nn.Linear(4, 128)
self.dropout = nn.Dropout(p=0.6)
self.affine2 = nn.Linear(128, 2)
self.saved_log_probs = []
self.rewards = []
def forward(self, x):
x = self.affine1(x)
x = self.dropout(x)
x = F.relu(x)
action_scores = self.affine2(x)
return F.softmax(action_scores, dim=1)
首先,让我们准备一个助手,以在RRef
的所有者工作程序上远程运行函数。 您将在本教程的示例中的多个地方发现该函数。 理想情况下,torch.distributed.rpc
包应立即提供这些助手函数。 例如,如果应用可以直接调用RRef.some_func(*arg)
,然后将其转换为RRef
所有者的 RPC,将会更容易。 在pytorch/pytorch#31743
中跟踪了此 API 的进度。
from torch.distributed.rpc import rpc_sync
def _call_method(method, rref, *args, **kwargs):
return method(rref.local_value(), *args, **kwargs)
def _remote_method(method, rref, *args, **kwargs):
args = [method, rref] + list(args)
return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)
# to call a function on an rref, we could do the following
# _remote_method(some_func, rref, *args)
我们准备介绍观察员。 在此示例中,每个观察者创建自己的环境,并等待智能体的命令来运行剧集。 在每个剧集中,一个观察者最多循环n_steps
个迭代,并且在每个迭代中,它使用 RPC 将其环境状态传递给智能体并取回操作。 然后,它将该操作应用于其环境,并从环境中获取奖励和下一个状态。 之后,观察者使用另一个 RPC 向智能体报告奖励。 同样,请注意,这显然不是最有效的观察者实现。 例如,一个简单的优化可能是将当前状态和最后的报酬打包到一个 RPC 中,以减少通信开销。 但是,目标是演示 RPC API,而不是为 CartPole 构建最佳的求解器。 因此,在此示例中,让逻辑保持简单,并明确两个步骤。
import argparse
import gym
import torch.distributed.rpc as rpc
parser = argparse.ArgumentParser(
description="RPC Reinforcement Learning Example",
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument('--world_size', default=2, help='Number of workers')
parser.add_argument('--log_interval', default=1, help='Log every log_interval episodes')
parser.add_argument('--gamma', default=0.1, help='how much to value future rewards')
parser.add_argument('--seed', default=1, help='random seed for reproducibility')
args = parser.parse_args()
class Observer:
def __init__(self):
self.id = rpc.get_worker_info().id
self.env = gym.make('CartPole-v1')
self.env.seed(args.seed)
def run_episode(self, agent_rref, n_steps):
state, ep_reward = self.env.reset(), 0
for step in range(n_steps):
# send the state to the agent to get an action
action = _remote_method(Agent.select_action, agent_rref, self.id, state)
# apply the action to the environment, and get the reward
state, reward, done, _ = self.env.step(action)
# report the reward to the agent for training purpose
_remote_method(Agent.report_reward, agent_rref, self.id, reward)
if done:
break
agent 的代码稍微复杂一点,我们将其分成多个部分。 在此示例中,智能体既充当训练者又充当主角色,以便它向多个分布式观察者发送命令以运行剧集,并且还记录本地的所有动作和奖励,这些动作和奖赏将在每个剧集之后的训练阶段使用。 下面的代码显示了Agent
构造器,其中大多数行都在初始化各种组件。 最后的循环在其他工作器上远程初始化观察者,并在本地将RRefs
保留给这些观察者。 智能体稍后将使用那些观察者RRefs
发送命令。 应用无需担心RRefs
的寿命。 每个RRef
的所有者维护一个引用计数图以跟踪其生命周期,并保证只要该RRef
的任何活动用户都不会删除远程数据对象。 有关详细信息,请参考RRef
设计文档。
import gym
import numpy as np
import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical
class Agent:
def __init__(self, world_size):
self.ob_rrefs = []
self.agent_rref = RRef(self)
self.rewards = {}
self.saved_log_probs = {}
self.policy = Policy()
self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
self.eps = np.finfo(np.float32).eps.item()
self.running_reward = 0
self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
for ob_rank in range(1, world_size):
ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
self.ob_rrefs.append(remote(ob_info, Observer))
self.rewards[ob_info.id] = []
self.saved_log_probs[ob_info.id] = []
接下来,智能体向观察者公开两个 API,以供他们选择动作和报告奖励。 这些函数仅在智能体上本地运行,但是将由观察者通过 RPC 触发。
class Agent:
...
def select_action(self, ob_id, state):
state = torch.from_numpy(state).float().unsqueeze(0)
probs = self.policy(state)
m = Categorical(probs)
action = m.sample()
self.saved_log_probs[ob_id].append(m.log_prob(action))
return action.item()
def report_reward(self, ob_id, reward):
self.rewards[ob_id].append(reward)
让我们在智能体上添加run_episode
函数,该函数告诉所有观察者执行片段。 在此函数中,它首先创建一个列表,以从异步 RPC 收集期货,然后在所有观察者RRefs
上循环以生成异步 RPC。 在这些 RPC 中,智能体还将自身的RRef
传递给观察者,以便观察者也可以在智能体上调用函数。 如上所示,每个观察者都将 RPC 返回给智能体,它们是嵌套的 RPC。 在每个剧集之后,saved_log_probs
和rewards
将包含记录的动作概率和奖励。
class Agent:
...
def run_episode(self, n_steps=0):
futs = []
for ob_rref in self.ob_rrefs:
# make async RPC to kick off an episode on all observers
futs.append(
rpc_async(
ob_rref.owner(),
_call_method,
args=(Observer.run_episode, ob_rref, self.agent_rref, n_steps)
)
)
# wait until all obervers have finished this episode
for fut in futs:
fut.wait()
最后,在一集之后,智能体需要训练模型,该模型在下面的finish_episode
函数中实现。 此函数中没有 RPC,并且大多数是从单线程示例中借用的。 因此,我们跳过描述其内容。
class Agent:
...
def finish_episode(self):
# joins probs and rewards from different observers into lists
R, probs, rewards = 0, [], []
for ob_id in self.rewards:
probs.extend(self.saved_log_probs[ob_id])
rewards.extend(self.rewards[ob_id])
# use the minimum observer reward to calculate the running reward
min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward
# clear saved probs and rewards
for ob_id in self.rewards:
self.rewards[ob_id] = []
self.saved_log_probs[ob_id] = []
policy_loss, returns = [], []
for r in rewards[::-1]:
R = r + args.gamma * R
returns.insert(0, R)
returns = torch.tensor(returns)
returns = (returns - returns.mean()) / (returns.std() + self.eps)
for log_prob, R in zip(probs, returns):
policy_loss.append(-log_prob * R)
self.optimizer.zero_grad()
policy_loss = torch.cat(policy_loss).sum()
policy_loss.backward()
self.optimizer.step()
return min_reward
使用Policy
,Observer
和Agent
类,我们准备启动多个过程来执行分布式训练。 在此示例中,所有进程都运行相同的run_worker
函数,并且它们使用等级来区分其角色。 等级 0 始终是智能体,其他所有等级都是观察者。 智能体通过重复调用run_episode
和finish_episode
作为主设备,直到运行的奖励超过环境指定的奖励阈值为止。 所有观察者都被动地等待来自智能体的命令。 该代码由rpc.init_rpc
和rpc.shutdown
包装,它们分别初始化和终止 RPC 实例。 API 页面中提供了更多详细信息。
import os
from itertools import count
import torch.multiprocessing as mp
AGENT_NAME = "agent"
OBSERVER_NAME="obs"
TOTAL_EPISODE_STEP = 100
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 0:
# rank0 is the agent
rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)
agent = Agent(world_size)
for i_episode in count(1):
n_steps = int(TOTAL_EPISODE_STEP / (args.world_size - 1))
agent.run_episode(n_steps=n_steps)
last_reward = agent.finish_episode()
if i_episode % args.log_interval == 0:
print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
i_episode, last_reward, agent.running_reward))
if agent.running_reward > agent.reward_threshold:
print("Solved! Running reward is now {}!".format(agent.running_reward))
break
else:
# other ranks are the observer
rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
# observers passively waiting for instructions from the agent
# block until all rpcs finish, and shutdown the RPC instance
rpc.shutdown()
mp.spawn(
run_worker,
args=(args.world_size, ),
nprocs=args.world_size,
join=True
)
以下是使用world_size = 2
进行训练时的一些示例输出。
Episode 10 Last reward: 26.00 Average reward: 10.01
Episode 20 Last reward: 16.00 Average reward: 11.27
Episode 30 Last reward: 49.00 Average reward: 18.62
Episode 40 Last reward: 45.00 Average reward: 26.09
Episode 50 Last reward: 44.00 Average reward: 30.03
Episode 60 Last reward: 111.00 Average reward: 42.23
Episode 70 Last reward: 131.00 Average reward: 70.11
Episode 80 Last reward: 87.00 Average reward: 76.51
Episode 90 Last reward: 86.00 Average reward: 95.93
Episode 100 Last reward: 13.00 Average reward: 123.93
Episode 110 Last reward: 33.00 Average reward: 91.39
Episode 120 Last reward: 73.00 Average reward: 76.38
Episode 130 Last reward: 137.00 Average reward: 88.08
Episode 140 Last reward: 89.00 Average reward: 104.96
Episode 150 Last reward: 97.00 Average reward: 98.74
Episode 160 Last reward: 150.00 Average reward: 100.87
Episode 170 Last reward: 126.00 Average reward: 104.38
Episode 180 Last reward: 500.00 Average reward: 213.74
Episode 190 Last reward: 322.00 Average reward: 300.22
Episode 200 Last reward: 165.00 Average reward: 272.71
Episode 210 Last reward: 168.00 Average reward: 233.11
Episode 220 Last reward: 184.00 Average reward: 195.02
Episode 230 Last reward: 284.00 Average reward: 208.32
Episode 240 Last reward: 395.00 Average reward: 247.37
Episode 250 Last reward: 500.00 Average reward: 335.42
Episode 260 Last reward: 500.00 Average reward: 386.30
Episode 270 Last reward: 500.00 Average reward: 405.29
Episode 280 Last reward: 500.00 Average reward: 443.29
Episode 290 Last reward: 500.00 Average reward: 464.65
Solved! Running reward is now 475.3163778435275!
在此示例中,我们展示了如何使用 RPC 作为通信工具来跨工作器传递数据,以及如何使用 RRef 引用远程对象。 的确,您可以直接在ProcessGroup
send
和recv
API 之上构建整个结构,也可以使用其他通信/ RPC 库。 但是,通过使用torch.distributed.rpc
,您可以在后台获得本机支持并不断优化性能。
接下来,我们将展示如何将 RPC 和 RRef 与分布式 Autograd 和分布式优化器结合起来执行分布式模型并行训练。
在本节中,我们将使用 RNN 模型来展示如何使用 RPC API 构建分布式模型并行训练。 示例 RNN 模型非常小,可以轻松地放入单个 GPU 中,但是我们仍将其层划分为两个不同的工作器来演示这一想法。 开发人员可以应用类似的技术在多个设备和机器上分布更大的模型。
RNN 模型设计是从 PyTorch 示例存储库中的词语言模型中借用的,该存储库包含三个主要组件,一个嵌入表,一个LSTM
层和一个解码器。 下面的代码将嵌入表和解码器包装到子模块中,以便它们的构造器可以传递给 RPC API。 在EmbeddingTable
子模块中,我们有意将Embedding
层放在 GPU 上以涵盖用例。 在 v1.4 中,RPC 始终在目标工作线程上创建 CPU 张量参数或返回值。 如果函数使用 GPU 张量,则需要将其显式移动到适当的设备。
class EmbeddingTable(nn.Module):
r"""
Encoding layers of the RNNModel
"""
def __init__(self, ntoken, ninp, dropout):
super(EmbeddingTable, self).__init__()
self.drop = nn.Dropout(dropout)
self.encoder = nn.Embedding(ntoken, ninp).cuda()
self.encoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, input):
return self.drop(self.encoder(input.cuda()).cpu()
class Decoder(nn.Module):
def __init__(self, ntoken, nhid, dropout):
super(Decoder, self).__init__()
self.drop = nn.Dropout(dropout)
self.decoder = nn.Linear(nhid, ntoken)
self.decoder.bias.data.zero_()
self.decoder.weight.data.uniform_(-0.1, 0.1)
def forward(self, output):
return self.decoder(self.drop(output))
使用上述子模块,我们现在可以使用 RPC 将它们组合在一起以创建 RNN 模型。 在下面的代码中,ps
代表参数服务器,该服务器托管嵌入表和解码器的参数。 构造器使用远程 API 在参数服务器上创建EmbeddingTable
对象和Decoder
对象,并在本地创建LSTM
子模块。 在前进过程中,训练器使用EmbeddingTable
RRef
查找远程子模块,然后使用 RPC 将输入数据传递到EmbeddingTable
,并获取查找结果。 然后,它通过本地LSTM
层运行嵌入,最后使用另一个 RPC 将输出发送到Decoder
子模块。 通常,要实现分布式模型并行训练,开发人员可以将模型分为多个子模块,调用 RPC 远程创建子模块实例,并在必要时使用RRef
查找它们。 正如您在下面的代码中看到的那样,它看起来与单机模型并行训练非常相似。 主要区别是用 RPC 函数替换了Tensor.to(device)
。
class RNNModel(nn.Module):
def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
super(RNNModel, self).__init__()
# setup embedding table remotely
self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
# setup LSTM locally
self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
# setup decoder remotely
self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))
def forward(self, input, hidden):
# pass input to the remote embedding table and fetch emb tensor back
emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
output, hidden = self.rnn(emb, hidden)
# pass output to the rremote decoder and get the decoded output back
decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
return decoded, hidden
在介绍分布式优化器之前,让我们添加一个辅助函数来生成模型参数的 RRef 列表,该列表将由分布式优化器使用。 在本地训练中,应用可以调用Module.parameters()
来获取对所有参数张量的引用,并将其传递给本地优化器以进行后续更新。 但是,由于某些参数存在于远程计算机上,因此同一 API 在分布式训练方案中不起作用。 因此,分布式优化器不采用参数Tensors
的列表,而是采用RRefs
的列表,每个模型参数一个RRef
用于本地和远程模型参数。 辅助函数非常简单,只需调用Module.parameters()
并在每个参数上创建一个本地RRef
。
def _parameter_rrefs(module):
param_rrefs = []
for param in module.parameters():
param_rrefs.append(RRef(param))
return param_rrefs
然后,由于RNNModel
包含三个子模块,因此我们需要调用_parameter_rrefs
3 次,并将其包装到另一个辅助函数中。
class RNNModel(nn.Module):
...
def parameter_rrefs(self):
remote_params = []
# get RRefs of embedding table
remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
# create RRefs for local parameters
remote_params.extend(_parameter_rrefs(self.rnn))
# get RRefs of decoder
remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
return remote_params
现在,我们准备实现训练循环。 初始化模型参数后,我们创建RNNModel
和DistributedOptimizer
。 分布式优化器将采用参数RRefs
的列表,查找所有不同的所有者工作器,并在每个所有者工作器上创建给定的本地优化器(即,在这种情况下,您也可以使用其他本地优化器SGD
) 使用给定的参数(即lr=0.05
)。
在训练循环中,它首先创建一个分布式 Autograd 上下文,这将帮助分布式 Autograd 引擎查找梯度和涉及的 RPC 发送/接收函数。 分布式 Autograd 引擎的设计详细信息可以在其设计说明中找到。 然后,它像本地模型一样开始正向传播,并运行分布式后向传递。 对于后向分布,您只需要指定一个根列表,在这种情况下,就是损失Tensor
。 分布式 Autograd 引擎将自动遍历分布式图并正确编写梯度。 接下来,它在分布式优化器上运行step
函数,该函数将与所有涉及的本地优化器联系以更新模型参数。 与本地训练相比,一个较小的差异是您不需要运行zero_grad()
,因为每个 Autograd 上下文都有专用的空间来存储梯度,并且在每次迭代创建上下文时,来自不同迭代的那些梯度不会累积到同一组Tensors
。
def run_trainer():
batch = 5
ntoken = 10
ninp = 2
nhid = 3
nindices = 3
nlayers = 4
hidden = (
torch.randn(nlayers, nindices, nhid),
torch.randn(nlayers, nindices, nhid)
)
model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)
# setup distributed optimizer
opt = DistributedOptimizer(
optim.SGD,
model.parameter_rrefs(),
lr=0.05,
)
criterion = torch.nn.CrossEntropyLoss()
def get_next_batch():
for _ in range(5):
data = torch.LongTensor(batch, nindices) % ntoken
target = torch.LongTensor(batch, ntoken) % nindices
yield data, target
# train for 10 iterations
for epoch in range(10):
for data, target in get_next_batch():
# create distributed autograd context
with dist_autograd.context() as context_id:
hidden[0].detach_()
hidden[1].detach_()
output, hidden = model(data, hidden)
loss = criterion(output, target)
# run distributed backward pass
dist_autograd.backward(context_id, [loss])
# run distributed optimizer
opt.step(context_id)
# not necessary to zero grads since they are
# accumulated into the distributed autograd context
# which is reset every iteration.
print("Training epoch {}".format(epoch))
最后,让我们添加一些粘合代码以启动参数服务器和训练器流程。
def run_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
if rank == 1:
rpc.init_rpc("trainer", rank=rank, world_size=world_size)
_run_trainer()
else:
rpc.init_rpc("ps", rank=rank, world_size=world_size)
# parameter server do nothing
pass
# block until all rpcs finish
rpc.shutdown()
if __name__=="__main__":
world_size = 2
mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)