99re热这里只有精品视频,7777色鬼xxxx欧美色妇,国产成人精品一区二三区在线观看,内射爽无广熟女亚洲,精品人妻av一区二区三区

PyTorch 分布式 RPC 框架入門

2020-09-10 10:31 更新
原文:https://pytorch.org/tutorials/intermediate/rpc_tutorial.html

作者:申力

警告

torch.distributed.rpc 程序包是實(shí)驗(yàn)性的,隨時(shí)可能更改。 它還需要 PyTorch 1.4.0+才能運(yùn)行,因?yàn)檫@是第一個(gè)支持 RPC 的版本。

本教程使用兩個(gè)簡(jiǎn)單的示例來演示如何使用 torch.distributed.rpc 軟件包構(gòu)建分布式訓(xùn)練,該軟件包首先在 PyTorch v1.4 中作為實(shí)驗(yàn)功能引入。 這兩個(gè)示例的源代碼可以在 PyTorch 示例中找到。

先前的教程分布式數(shù)據(jù)并行入門和用 PyTorch 編寫分布式應(yīng)用程序,描述了 DistributedDataParallel ,該模型支持特定的訓(xùn)練范例,其中模型可以在多個(gè)過程中復(fù)制 每個(gè)進(jìn)程都會(huì)處理輸入數(shù)據(jù)的拆分。 有時(shí),您可能會(huì)遇到需要不同訓(xùn)練范例的場(chǎng)景。 例如:

  1. 在強(qiáng)化學(xué)習(xí)中,從環(huán)境中獲取訓(xùn)練數(shù)據(jù)可能相對(duì)昂貴,而模型本身可能很小。 在這種情況下,產(chǎn)生多個(gè)并行運(yùn)行的觀察者并共享一個(gè)代理可能會(huì)很有用。 在這種情況下,代理將在本地負(fù)責(zé)訓(xùn)練,但是應(yīng)用程序仍將需要庫(kù)在觀察者和訓(xùn)練者之間發(fā)送和接收數(shù)據(jù)。
  2. 您的模型可能太大,無(wú)法容納在一臺(tái)計(jì)算機(jī)上的 GPU 中,因此需要一個(gè)庫(kù)來幫助將模型拆分到多臺(tái)計(jì)算機(jī)上。 或者,您可能正在實(shí)現(xiàn)參數(shù)服務(wù)器訓(xùn)練框架,其中模型參數(shù)和訓(xùn)練器位于不同的機(jī)器上。

torch.distributed.rpc 程序包可以幫助解決上述情況。 在情況 1 中, RPC 和 RRef 允許將數(shù)據(jù)從一個(gè)工作程序發(fā)送到另一個(gè)工作程序,同時(shí)輕松引用遠(yuǎn)程數(shù)據(jù)對(duì)象。 在情況 2 中,分布式 autograd 和分布式優(yōu)化器使執(zhí)行反向傳遞和優(yōu)化器步驟就像本地訓(xùn)練一樣。 在接下來的兩節(jié)中,我們將使用強(qiáng)化學(xué)習(xí)示例和語(yǔ)言模型示例來演示 torch.distributed.rpc 的 API。 請(qǐng)注意,本教程并非旨在構(gòu)建最準(zhǔn)確或最有效的模型來解決給定的問題,相反,此處的主要目標(biāo)是演示如何使用 torch.distributed.rpc 包來構(gòu)建分布式訓(xùn)練 應(yīng)用程序。

使用 RPC 和 RRef 進(jìn)行分布式強(qiáng)化學(xué)習(xí)

本節(jié)介紹了使用 RPC 建立玩具分布式強(qiáng)化學(xué)習(xí)模型以解決 OpenAI Gym 中的 CartPole-v1 的步驟。 策略代碼主要是從現(xiàn)有的單線程示例中借用的,如下所示。 我們將跳過Policy設(shè)計(jì)的詳細(xì)信息,并將重點(diǎn)介紹 RPC 的用法。

import torch.nn as nn
import torch.nn.functional as F


class Policy(nn.Module):


    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(4, 128)
        self.dropout = nn.Dropout(p=0.6)
        self.affine2 = nn.Linear(128, 2)


        self.saved_log_probs = []
        self.rewards = []


    def forward(self, x):
        x = self.affine1(x)
        x = self.dropout(x)
        x = F.relu(x)
        action_scores = self.affine2(x)
        return F.softmax(action_scores, dim=1)

首先,讓我們準(zhǔn)備一個(gè)幫助程序,以在RRef的所有者工作程序上遠(yuǎn)程運(yùn)行功能。 您將在本教程的示例中的多個(gè)地方找到該功能。 理想情況下, <cite>torch.distributed.rpc</cite> 程序包應(yīng)立即提供這些幫助程序功能。 例如,如果應(yīng)用程序可以直接調(diào)用RRef.some_func(*arg),然后將其轉(zhuǎn)換為RRef所有者的 RPC,將會(huì)更容易。 在 pytorch / pytorch#31743 中跟蹤了此 API 的進(jìn)度。

from torch.distributed.rpc import rpc_sync


def _call_method(method, rref, *args, **kwargs):
    return method(rref.local_value(), *args, **kwargs)


def _remote_method(method, rref, *args, **kwargs):
    args = [method, rref] + list(args)
    return rpc_sync(rref.owner(), _call_method, args=args, kwargs=kwargs)


## to call a function on an rref, we could do the following
## _remote_method(some_func, rref, *args)

我們準(zhǔn)備介紹觀察員。 在此示例中,每個(gè)觀察者創(chuàng)建自己的環(huán)境,并等待代理的命令來運(yùn)行情節(jié)。 在每個(gè)情節(jié)中,一個(gè)觀察者最多循環(huán)n_steps個(gè)迭代,并且在每個(gè)迭代中,它使用 RPC 將其環(huán)境狀態(tài)傳遞給代理并取回操作。 然后,它將該操作應(yīng)用于其環(huán)境,并從環(huán)境中獲取獎(jiǎng)勵(lì)和下一個(gè)狀態(tài)。 之后,觀察者使用另一個(gè) RPC 向代理報(bào)告獎(jiǎng)勵(lì)。 同樣,請(qǐng)注意,這顯然不是最有效的觀察者實(shí)現(xiàn)。 例如,一個(gè)簡(jiǎn)單的優(yōu)化可能是將當(dāng)前狀態(tài)和最后的報(bào)酬打包到一個(gè) RPC 中,以減少通信開銷。 但是,目標(biāo)是演示 RPC API,而不是為 CartPole 構(gòu)建最佳的求解器。 因此,在此示例中,讓邏輯保持簡(jiǎn)單,并明確兩個(gè)步驟。

import argparse
import gym
import torch.distributed.rpc as rpc


parser = argparse.ArgumentParser(
    description="RPC Reinforcement Learning Example",
    formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)


parser.add_argument('--world_size', default=2, help='Number of workers')
parser.add_argument('--log_interval', default=1, help='Log every log_interval episodes')
parser.add_argument('--gamma', default=0.1, help='how much to value future rewards')
parser.add_argument('--seed', default=1, help='random seed for reproducibility')
args = parser.parse_args()


class Observer:


    def __init__(self):
        self.id = rpc.get_worker_info().id
        self.env = gym.make('CartPole-v1')
        self.env.seed(args.seed)


    def run_episode(self, agent_rref, n_steps):
        state, ep_reward = self.env.reset(), 0
        for step in range(n_steps):
            # send the state to the agent to get an action
            action = _remote_method(Agent.select_action, agent_rref, self.id, state)


            # apply the action to the environment, and get the reward
            state, reward, done, _ = self.env.step(action)


            # report the reward to the agent for training purpose
            _remote_method(Agent.report_reward, agent_rref, self.id, reward)


            if done:
                break

agent 的代碼稍微復(fù)雜一點(diǎn),我們將其分為多部分。 在此示例中,代理既充當(dāng)訓(xùn)練者又充當(dāng)主人,因此它向多個(gè)分布式觀察者發(fā)送命令以運(yùn)行情節(jié),并且還記錄所有本地行為和獎(jiǎng)勵(lì),這些行為和獎(jiǎng)賞將在每個(gè)情節(jié)之后的訓(xùn)練階段中使用。 下面的代碼顯示了Agent構(gòu)造函數(shù),其中大多數(shù)行都在初始化各種組件。 最后的循環(huán)在其他工作者上遠(yuǎn)程初始化觀察者,并在本地將RRefs保留給這些觀察者。 代理稍后將使用那些觀察者RRefs發(fā)送命令。 應(yīng)用程序無(wú)需擔(dān)心RRefs的壽命。 每個(gè)RRef的所有者維護(hù)一個(gè)參考計(jì)數(shù)圖以跟蹤其生命周期,并保證只要該RRef的任何活動(dòng)用戶都不會(huì)刪除遠(yuǎn)程數(shù)據(jù)對(duì)象。 有關(guān)詳細(xì)信息,請(qǐng)參考RRef 設(shè)計(jì)文檔

import gym
import numpy as np


import torch
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.rpc import RRef, rpc_async, remote
from torch.distributions import Categorical


class Agent:
    def __init__(self, world_size):
        self.ob_rrefs = []
        self.agent_rref = RRef(self)
        self.rewards = {}
        self.saved_log_probs = {}
        self.policy = Policy()
        self.optimizer = optim.Adam(self.policy.parameters(), lr=1e-2)
        self.eps = np.finfo(np.float32).eps.item()
        self.running_reward = 0
        self.reward_threshold = gym.make('CartPole-v1').spec.reward_threshold
        for ob_rank in range(1, world_size):
            ob_info = rpc.get_worker_info(OBSERVER_NAME.format(ob_rank))
            self.ob_rrefs.append(remote(ob_info, Observer))
            self.rewards[ob_info.id] = []
            self.saved_log_probs[ob_info.id] = []

接下來,代理向觀察者公開兩個(gè) API,以供他們選擇動(dòng)作和報(bào)告獎(jiǎng)勵(lì)。 這些功能僅在代理上本地運(yùn)行,但是將由觀察者通過 RPC 觸發(fā)。

class Agent:
    ...
    def select_action(self, ob_id, state):
        state = torch.from_numpy(state).float().unsqueeze(0)
        probs = self.policy(state)
        m = Categorical(probs)
        action = m.sample()
        self.saved_log_probs[ob_id].append(m.log_prob(action))
        return action.item()


    def report_reward(self, ob_id, reward):
        self.rewards[ob_id].append(reward)

讓我們?cè)诖砩咸砑?code>run_episode函數(shù),該函數(shù)告訴所有觀察者執(zhí)行片段。 在此函數(shù)中,它首先創(chuàng)建一個(gè)列表,以從異步 RPC 收集期貨,然后在所有觀察者RRefs上循環(huán)以生成異步 RPC。 在這些 RPC 中,代理還將自身的RRef傳遞給觀察者,以便觀察者也可以在代理上調(diào)用函數(shù)。 如上所示,每個(gè)觀察者都將 RPC 返回給代理,它們是嵌套的 RPC。 在每個(gè)情節(jié)之后,saved_log_probsrewards將包含記錄的動(dòng)作概率和獎(jiǎng)勵(lì)。

class Agent:
    ...
    def run_episode(self, n_steps=0):
        futs = []
        for ob_rref in self.ob_rrefs:
            # make async RPC to kick off an episode on all observers
            futs.append(
                rpc_async(
                    ob_rref.owner(),
                    _call_method,
                    args=(Observer.run_episode, ob_rref, self.agent_rref, n_steps)
                )
            )


        # wait until all obervers have finished this episode
        for fut in futs:
            fut.wait()

最后,在一集之后,代理需要訓(xùn)練模型,該模型在下面的finish_episode函數(shù)中實(shí)現(xiàn)。 此函數(shù)中沒有 RPC,并且大多數(shù)是從單線程示例中借用的。 因此,我們跳過描述其內(nèi)容。

class Agent:
    ...
    def finish_episode(self):
      # joins probs and rewards from different observers into lists
      R, probs, rewards = 0, [], []
      for ob_id in self.rewards:
          probs.extend(self.saved_log_probs[ob_id])
          rewards.extend(self.rewards[ob_id])


      # use the minimum observer reward to calculate the running reward
      min_reward = min([sum(self.rewards[ob_id]) for ob_id in self.rewards])
      self.running_reward = 0.05 * min_reward + (1 - 0.05) * self.running_reward


      # clear saved probs and rewards
      for ob_id in self.rewards:
          self.rewards[ob_id] = []
          self.saved_log_probs[ob_id] = []


      policy_loss, returns = [], []
      for r in rewards[::-1]:
          R = r + args.gamma * R
          returns.insert(0, R)
      returns = torch.tensor(returns)
      returns = (returns - returns.mean()) / (returns.std() + self.eps)
      for log_prob, R in zip(probs, returns):
          policy_loss.append(-log_prob * R)
      self.optimizer.zero_grad()
      policy_loss = torch.cat(policy_loss).sum()
      policy_loss.backward()
      self.optimizer.step()
      return min_reward

使用Policy,ObserverAgent類,我們準(zhǔn)備啟動(dòng)多個(gè)進(jìn)程來執(zhí)行分布式訓(xùn)練。 在此示例中,所有進(jìn)程都運(yùn)行相同的run_worker函數(shù),并且它們使用等級(jí)來區(qū)分其角色。 等級(jí) 0 始終是代理,其他所有等級(jí)都是觀察者。 代理通過重復(fù)調(diào)用run_episodefinish_episode充當(dāng)主控,直到運(yùn)行的獎(jiǎng)勵(lì)超過環(huán)境指定的獎(jiǎng)勵(lì)閾值為止。 所有觀察者都被動(dòng)地等待來自代理的命令。 該代碼由 rpc.init_rpc 和 rpc.shutdown 包裝,它們分別初始化和終止 RPC 實(shí)例。 API 頁(yè)面中提供了更多詳細(xì)信息。

import os
from itertools import count


import torch.multiprocessing as mp


AGENT_NAME = "agent"
OBSERVER_NAME="obs"
TOTAL_EPISODE_STEP = 100


def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 0:
        # rank0 is the agent
        rpc.init_rpc(AGENT_NAME, rank=rank, world_size=world_size)


        agent = Agent(world_size)
        for i_episode in count(1):
            n_steps = int(TOTAL_EPISODE_STEP / (args.world_size - 1))
            agent.run_episode(n_steps=n_steps)
            last_reward = agent.finish_episode()


            if i_episode % args.log_interval == 0:
                print('Episode {}\tLast reward: {:.2f}\tAverage reward: {:.2f}'.format(
                      i_episode, last_reward, agent.running_reward))


            if agent.running_reward > agent.reward_threshold:
                print("Solved! Running reward is now {}!".format(agent.running_reward))
                break
    else:
        # other ranks are the observer
        rpc.init_rpc(OBSERVER_NAME.format(rank), rank=rank, world_size=world_size)
        # observers passively waiting for instructions from the agent


    # block until all rpcs finish, and shutdown the RPC instance
    rpc.shutdown()


mp.spawn(
    run_worker,
    args=(args.world_size, ),
    nprocs=args.world_size,
    join=True
)

以下是使用 <cite>world_size = 2</cite> 進(jìn)行訓(xùn)練時(shí)的一些示例輸出。

Episode 10      Last reward: 26.00      Average reward: 10.01
Episode 20      Last reward: 16.00      Average reward: 11.27
Episode 30      Last reward: 49.00      Average reward: 18.62
Episode 40      Last reward: 45.00      Average reward: 26.09
Episode 50      Last reward: 44.00      Average reward: 30.03
Episode 60      Last reward: 111.00     Average reward: 42.23
Episode 70      Last reward: 131.00     Average reward: 70.11
Episode 80      Last reward: 87.00      Average reward: 76.51
Episode 90      Last reward: 86.00      Average reward: 95.93
Episode 100     Last reward: 13.00      Average reward: 123.93
Episode 110     Last reward: 33.00      Average reward: 91.39
Episode 120     Last reward: 73.00      Average reward: 76.38
Episode 130     Last reward: 137.00     Average reward: 88.08
Episode 140     Last reward: 89.00      Average reward: 104.96
Episode 150     Last reward: 97.00      Average reward: 98.74
Episode 160     Last reward: 150.00     Average reward: 100.87
Episode 170     Last reward: 126.00     Average reward: 104.38
Episode 180     Last reward: 500.00     Average reward: 213.74
Episode 190     Last reward: 322.00     Average reward: 300.22
Episode 200     Last reward: 165.00     Average reward: 272.71
Episode 210     Last reward: 168.00     Average reward: 233.11
Episode 220     Last reward: 184.00     Average reward: 195.02
Episode 230     Last reward: 284.00     Average reward: 208.32
Episode 240     Last reward: 395.00     Average reward: 247.37
Episode 250     Last reward: 500.00     Average reward: 335.42
Episode 260     Last reward: 500.00     Average reward: 386.30
Episode 270     Last reward: 500.00     Average reward: 405.29
Episode 280     Last reward: 500.00     Average reward: 443.29
Episode 290     Last reward: 500.00     Average reward: 464.65
Solved! Running reward is now 475.3163778435275!

在此示例中,我們展示了如何使用 RPC 作為通信工具來跨工作人員傳遞數(shù)據(jù),以及如何使用 RRef 引用遠(yuǎn)程對(duì)象。 的確,您可以直接在ProcessGroup sendrecv API 之上構(gòu)建整個(gè)結(jié)構(gòu),也可以使用其他通信/ RPC 庫(kù)。 但是,通過使用 <cite>torch.distributed.rpc</cite> ,您可以在后臺(tái)獲得本機(jī)支持并不斷優(yōu)化性能。

接下來,我們將展示如何將 RPC 和 RRef 與分布式 autograd 和分布式優(yōu)化器結(jié)合起來執(zhí)行分布式模型并行訓(xùn)練。

使用 Distributed Autograd 和 Distributed Optimizer 的 Distributed RNN

在本節(jié)中,我們將使用 RNN 模型來展示如何使用 RPC API 構(gòu)建分布式模型并行訓(xùn)練。 示例 RNN 模型非常小,可以輕松地放入單個(gè) GPU 中,但是我們?nèi)詫⑵鋵觿澐譃閮蓚€(gè)不同的工作人員來演示這一想法。 開發(fā)人員可以應(yīng)用類似的技術(shù)在多個(gè)設(shè)備和機(jī)器上分布更大的模型。

RNN 模型設(shè)計(jì)是從 PyTorch 示例存儲(chǔ)庫(kù)中的詞語(yǔ)言模型中借用的,該存儲(chǔ)庫(kù)包含三個(gè)主要組件,一個(gè)嵌入表,一個(gè)LSTM層和一個(gè)解碼器。 下面的代碼將嵌入表和解碼器包裝到子模塊中,以便它們的構(gòu)造函數(shù)可以傳遞給 RPC API。 在EmbeddingTable子模塊中,我們有意將Embedding層放在 GPU 上以涵蓋用例。 在 v1.4 中,RPC 始終在目標(biāo)工作線程上創(chuàng)建 CPU 張量參數(shù)或返回值。 如果函數(shù)使用 GPU 張量,則需要將其顯式移動(dòng)到適當(dāng)?shù)脑O(shè)備。

class EmbeddingTable(nn.Module):
    r"""
    Encoding layers of the RNNModel
    """
    def __init__(self, ntoken, ninp, dropout):
        super(EmbeddingTable, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.encoder = nn.Embedding(ntoken, ninp).cuda()
        self.encoder.weight.data.uniform_(-0.1, 0.1)


    def forward(self, input):
        return self.drop(self.encoder(input.cuda()).cpu()


class Decoder(nn.Module):
    def __init__(self, ntoken, nhid, dropout):
        super(Decoder, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.decoder = nn.Linear(nhid, ntoken)
        self.decoder.bias.data.zero_()
        self.decoder.weight.data.uniform_(-0.1, 0.1)


    def forward(self, output):
        return self.decoder(self.drop(output))

使用上述子模塊,我們現(xiàn)在可以使用 RPC 將它們組合在一起以創(chuàng)建 RNN 模型。 在下面的代碼中,ps代表參數(shù)服務(wù)器,該服務(wù)器托管嵌入表和解碼器的參數(shù)。 構(gòu)造函數(shù)使用遠(yuǎn)程 API 在參數(shù)服務(wù)器上創(chuàng)建EmbeddingTable對(duì)象和Decoder對(duì)象,并在本地創(chuàng)建LSTM子模塊。 在正向傳遞過程中,訓(xùn)練師使用EmbeddingTable RRef查找遠(yuǎn)程子模塊,然后使用 RPC 將輸入數(shù)據(jù)傳遞到EmbeddingTable,并獲取查找結(jié)果。 然后,它通過本地LSTM層運(yùn)行嵌入,最后使用另一個(gè) RPC 將輸出發(fā)送到Decoder子模塊。 通常,要實(shí)施分布式模型并行訓(xùn)練,開發(fā)人員可以將模型劃分為子模塊,調(diào)用 RPC 遠(yuǎn)程創(chuàng)建子模塊實(shí)例,并在必要時(shí)使用RRef查找它們。 如下面的代碼所示,它看起來與單機(jī)模型并行訓(xùn)練非常相似。 主要區(qū)別是用 RPC 功能替換了Tensor.to(device)

class RNNModel(nn.Module):
    def __init__(self, ps, ntoken, ninp, nhid, nlayers, dropout=0.5):
        super(RNNModel, self).__init__()


        # setup embedding table remotely
        self.emb_table_rref = rpc.remote(ps, EmbeddingTable, args=(ntoken, ninp, dropout))
        # setup LSTM locally
        self.rnn = nn.LSTM(ninp, nhid, nlayers, dropout=dropout)
        # setup decoder remotely
        self.decoder_rref = rpc.remote(ps, Decoder, args=(ntoken, nhid, dropout))


    def forward(self, input, hidden):
        # pass input to the remote embedding table and fetch emb tensor back
        emb = _remote_method(EmbeddingTable.forward, self.emb_table_rref, input)
        output, hidden = self.rnn(emb, hidden)
        # pass output to the rremote decoder and get the decoded output back
        decoded = _remote_method(Decoder.forward, self.decoder_rref, output)
        return decoded, hidden

在介紹分布式優(yōu)化器之前,讓我們添加一個(gè)輔助函數(shù)來生成模型參數(shù)的 RRef 列表,這些列表將由分布式優(yōu)化器使用。 在本地訓(xùn)練中,應(yīng)用程序可以調(diào)用Module.parameters()來獲取對(duì)所有參數(shù)張量的引用,并將其傳遞給本地優(yōu)化器以進(jìn)行后續(xù)更新。 但是,由于某些參數(shù)存在于遠(yuǎn)程計(jì)算機(jī)上,因此同一 API 在分布式訓(xùn)練方案中不起作用。 因此,分布式優(yōu)化器不采用參數(shù)Tensors的列表,而是采用RRefs的列表,對(duì)于本地和遠(yuǎn)程模型參數(shù),每個(gè)模型參數(shù)一個(gè)RRef。 輔助函數(shù)非常簡(jiǎn)單,只需調(diào)用Module.parameters()并在每個(gè)參數(shù)上創(chuàng)建一個(gè)本地RRef。

def _parameter_rrefs(module):
    param_rrefs = []
    for param in module.parameters():
        param_rrefs.append(RRef(param))
    return param_rrefs

然后,由于RNNModel包含三個(gè)子模塊,因此我們需要調(diào)用_parameter_rrefs三次,并將其包裝到另一個(gè)輔助函數(shù)中。

class RNNModel(nn.Module):
    ...
    def parameter_rrefs(self):
        remote_params = []
        # get RRefs of embedding table
        remote_params.extend(_remote_method(_parameter_rrefs, self.emb_table_rref))
        # create RRefs for local parameters
        remote_params.extend(_parameter_rrefs(self.rnn))
        # get RRefs of decoder
        remote_params.extend(_remote_method(_parameter_rrefs, self.decoder_rref))
        return remote_params

現(xiàn)在,我們準(zhǔn)備實(shí)施訓(xùn)練循環(huán)。 初始化模型參數(shù)后,我們創(chuàng)建RNNModelDistributedOptimizer。 分布式優(yōu)化器將采用參數(shù)RRefs的列表,查找所有不同的所有者工作器,并在每個(gè)所有者工作器上創(chuàng)建給定的本地優(yōu)化器(即,在這種情況下,您也可以使用其他本地優(yōu)化器SGD) 使用給定的參數(shù)(即lr=0.05)。

在訓(xùn)練循環(huán)中,它首先創(chuàng)建一個(gè)分布式 autograd 上下文,這將幫助分布式 autograd 引擎查找漸變和涉及的 RPC 發(fā)送/接收功能。 分布式 autograd 引擎的設(shè)計(jì)詳細(xì)信息可以在其設(shè)計(jì)說明中找到。 然后,它像本地模型一樣開始前進(jìn),并運(yùn)行分布式后退。 對(duì)于后向分布,您只需要指定一個(gè)根列表,在這種情況下,就是損失Tensor。 分布式 autograd 引擎將自動(dòng)遍歷分布式圖形并正確編寫漸變。 接下來,它在分布式優(yōu)化器上運(yùn)行step函數(shù),該函數(shù)將與所有涉及的本地優(yōu)化器聯(lián)系以更新模型參數(shù)。 與本地訓(xùn)練相比,一個(gè)較小的差異是您不需要運(yùn)行zero_grad(),因?yàn)槊總€(gè) autograd 上下文都有專用的空間來存儲(chǔ)梯度,并且在每次迭代創(chuàng)建上下文時(shí),來自不同迭代的那些梯度不會(huì)累積到 同一組Tensors。

def run_trainer():
    batch = 5
    ntoken = 10
    ninp = 2


    nhid = 3
    nindices = 3
    nlayers = 4
    hidden = (
        torch.randn(nlayers, nindices, nhid),
        torch.randn(nlayers, nindices, nhid)
    )


    model = rnn.RNNModel('ps', ntoken, ninp, nhid, nlayers)


    # setup distributed optimizer
    opt = DistributedOptimizer(
        optim.SGD,
        model.parameter_rrefs(),
        lr=0.05,
    )


    criterion = torch.nn.CrossEntropyLoss()


    def get_next_batch():
        for _ in range(5):
            data = torch.LongTensor(batch, nindices) % ntoken
            target = torch.LongTensor(batch, ntoken) % nindices
            yield data, target


    # train for 10 iterations
    for epoch in range(10):
        # create distributed autograd context
        for data, target in get_next_batch():
            with dist_autograd.context():
                hidden[0].detach_()
                hidden[1].detach_()
                output, hidden = model(data, hidden)
                loss = criterion(output, target)
                # run distributed backward pass
                dist_autograd.backward([loss])
                # run distributed optimizer
                opt.step()
                # not necessary to zero grads as each iteration creates a different
                # distributed autograd context which hosts different grads
        print("Training epoch {}".format(epoch))

最后,讓我們添加一些粘合代碼以啟動(dòng)參數(shù)服務(wù)器和訓(xùn)練師流程。

def run_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '29500'
    if rank == 1:
        rpc.init_rpc("trainer", rank=rank, world_size=world_size)
        _run_trainer()
    else:
        rpc.init_rpc("ps", rank=rank, world_size=world_size)
        # parameter server do nothing
        pass


    # block until all rpcs finish
    rpc.shutdown()


if __name__=="__main__":
    world_size = 2
    mp.spawn(run_worker, args=(world_size, ), nprocs=world_size, join=True)



以上內(nèi)容是否對(duì)您有幫助:
在線筆記
App下載
App下載

掃描二維碼

下載編程獅App

公眾號(hào)
微信公眾號(hào)

編程獅公眾號(hào)