pytorch rpc实现分物理机器实现model parallel的过程详解

 更新时间:2023年05月20日 09:09:49   作者:UnknownBody  
这篇文章主要介绍了pytorch rpc实现分物理机器实现model parallel的过程,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友可以参考下

因为业务需要,最近接到一项任务,是如何利用pytorch实现model parallel以及distributed training。搜罗了网上很多资料,以及阅读了pytorch官方的教程,都没有可参考的案例。讲的比较多的是data parallel,关于model parallel的研究发现不多。
通过阅读pytorch官方主页,发现这个example是进行model parallel的,
官方博客地址:DISTRIBUTED PIPELINE PARALLELISM USING RPC
官方的example地址:Distributed Pipeline Parallel Example
通过阅读代码发现,这个代码以Resnet 50 model为例,将model直接拆分成两部分,并指定两部分在不同的worker运行,代码实现了在同一台机器上,创建多进程来拆分模型运行。关于这个代码的详细介绍可搜索关键词:pytorch RPC 的分布式管道并行,这里不多介绍。
通过在本地运行代码发现,不满足多机器运行的需求。接下来是思考的心路里程。

1.首先通过代码发现,python main.py程序运行时,无法指定rank,那么在跨机器运行时如何知道哪台机器是worker1,worker2?这个地方,我们首先怀疑需要去修改worker,人为在代码中指定worker的IP地址,如修改main.py 代码中191行
修改前:
model = DistResNet50(split_size, ["worker1", "worker2"])
修改后:
model = DistResNet50(split_size, ["worker1@xxx.xxx.xxx.xxx", "worker2@xxx.xxx.xxx.xxx"])
然后,很自然的就报错了,这里无法识别这样的worker名,不支持直接指定,这条路也就走不通了。

2.接着只能重新阅读代码,到最后251行,我们发现
mp.spawn(run_worker, args=(world_size, num_split), nprocs=world_size, join=True)
尤其是这行代码中mp.spawn引起了我们的怀疑,这不是多进程么,这本质还是在多进程情况下来执行程序,无法跨机器啊,不符合我们的需求。

3.最后的最后,我们重新阅读pytorch rpc机制,并通过简单测试程序,让两台机器互相通信,其中一台机器发起运算请求并传输原始数据,另外一台机器负责接收数据并进行相关运算,这个程序当时在两台物理机器上测试成功了,那说明rpc实现通信这件事并不复杂。结合前面给的代码,我们决定将worke1和worker2分开写代码,分开执行,并且在代码中需要指定这些worker所属的rank,这样理论上就能够将原始代码修改成分机器的rpc通信运行了。
上面主要是我们的心理历程,话不多说,接下来show the code。
实验环境,两台机器,均是cpu环境,conda安装的环境也保证了一致。
master机器代码:

# https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.py
import os
import threading
import time
import torch
import torch.nn as nn
import torch.distributed.autograd as dist_autograd
import torch.distributed.rpc as rpc
import torch.optim as optim
from torch.distributed.optim import DistributedOptimizer
from torch.distributed.rpc import RRef
from torchvision.models.resnet import Bottleneck
os.environ['MASTER_ADDR'] = 'XXX.XXX.XXX.XXX' # 指定master ip地址
os.environ['MASTER_PORT'] = '7856' # 指定master 端口号
#########################################################
#           Define Model Parallel ResNet50              #
#########################################################
# In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.
num_classes = 1000
def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class ResNetBase(nn.Module):
    def __init__(self, block, inplanes, num_classes=1000,
                 groups=1, width_per_group=64, norm_layer=None):
        super(ResNetBase, self).__init__()
        self._lock = threading.Lock()
        self._block = block
        self._norm_layer = nn.BatchNorm2d
        self.inplanes = inplanes
        self.dilation = 1
        self.groups = groups
        self.base_width = width_per_group
    def _make_layer(self, planes, blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if stride != 1 or self.inplanes != planes * self._block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * self._block.expansion, stride),
                norm_layer(planes * self._block.expansion),
            )
        layers = []
        layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
                                  self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * self._block.expansion
        for _ in range(1, blocks):
            layers.append(self._block(self.inplanes, planes, groups=self.groups,
                                      base_width=self.base_width, dilation=self.dilation,
                                      norm_layer=norm_layer))
        return nn.Sequential(*layers)
    def parameter_rrefs(self):
        r"""
        Create one RRef for each parameter in the given local module, and return a
        list of RRefs.
        """
        return [RRef(p) for p in self.parameters()]
class ResNetShard1(ResNetBase):
    """
    The first part of ResNet.
    """
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard1, self).__init__(
            Bottleneck, 64, num_classes=num_classes, *args, **kwargs)
        self.device = device
        self.seq = nn.Sequential(
            nn.Conv2d(3, self.inplanes, kernel_size=7, stride=2, padding=3, bias=False),
            self._norm_layer(self.inplanes),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1),
            self._make_layer(64, 3),
            self._make_layer(128, 4, stride=2)
        ).to(self.device)
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.ones_(m.weight)
                nn.init.zeros_(m.bias)
    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out =  self.seq(x)
        return out.cpu()
class ResNetShard2(ResNetBase):
    """
    The second part of ResNet.
    """
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard2, self).__init__(
            Bottleneck, 512, num_classes=num_classes, *args, **kwargs)
        self.device = device
        self.seq = nn.Sequential(
            self._make_layer(256, 6, stride=2),
            self._make_layer(512, 3, stride=2),
            nn.AdaptiveAvgPool2d((1, 1)),
        ).to(self.device)
        self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)
    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        with self._lock:
            out = self.fc(torch.flatten(self.seq(x), 1))
        return out.cpu()
class DistResNet50(nn.Module):
    """
    Assemble two parts as an nn.Module and define pipelining logic
    """
    def __init__(self, split_size, workers, *args, **kwargs):
        super(DistResNet50, self).__init__()
        self.split_size = split_size
        # Put the first part of the ResNet50 on workers[0]
        self.p1_rref = rpc.remote(
            workers[0],
            ResNetShard1,
            args = ("cuda:0",) + args,
            kwargs = kwargs
        )
        # Put the second part of the ResNet50 on workers[1]
        self.p2_rref = rpc.remote(
            workers[1],
            ResNetShard2,
            args = ("cpu",) + args,
            kwargs = kwargs
        )
    def forward(self, xs):
        # Split the input batch xs into micro-batches, and collect async RPC
        # futures into a list
        out_futures = []
        for x in iter(xs.split(self.split_size, dim=0)):
            x_rref = RRef(x)
            y_rref = self.p1_rref.remote().forward(x_rref)
            print(y_rref)
            z_fut = self.p2_rref.rpc_async().forward(y_rref)
            print(z_fut)
            out_futures.append(z_fut)
        # collect and cat all output tensors into one tensor.
        return torch.cat(torch.futures.wait_all(out_futures))
    def parameter_rrefs(self):
        remote_params = []
        remote_params.extend(self.p1_rref.remote().parameter_rrefs().to_here())
        remote_params.extend(self.p2_rref.remote().parameter_rrefs().to_here())
        return remote_params
#########################################################
#                   Run RPC Processes                   #
#########################################################
num_batches = 3
batch_size = 8
image_w = 128
image_h = 128
if __name__=="__main__":
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)
    # 初始化主节点的RPC连接
    rpc.init_rpc("master", rank=0, world_size=2, rpc_backend_options=options)
    for num_split in [1,2]:
        tik = time.time()
        model = DistResNet50(num_split, ["master", "worker"])
        loss_fn = nn.MSELoss()
        opt = DistributedOptimizer(
            optim.SGD,
            model.parameter_rrefs(),
            lr=0.05,
        )
        one_hot_indices = torch.LongTensor(batch_size) \
            .random_(0, num_classes) \
            .view(batch_size, 1)
        for i in range(num_batches):
            print(f"Processing batch {i}")
            # generate random inputs and labels
            inputs = torch.randn(batch_size, 3, image_w, image_h)
            labels = torch.zeros(batch_size, num_classes) \
                .scatter_(1, one_hot_indices, 1)
            with dist_autograd.context() as context_id:
                outputs = model(inputs)
                dist_autograd.backward(context_id, [loss_fn(outputs, labels)])
                opt.step(context_id)
        tok = time.time()
        print(f"number of splits = {num_split}, execution time = {tok - tik}")
    # 关闭RPC连接
    rpc.shutdown()

worker端的代码

# https://github.com/pytorch/examples/blob/main/distributed/rpc/pipeline/main.py
import os
import threading
import time
from functools import wraps
import torch
import torch.nn as nn
import torch.distributed.rpc as rpc
from torch.distributed.rpc import RRef
from torchvision.models.resnet import Bottleneck
os.environ['MASTER_ADDR'] = 'XXX.XXX.XXX.XXX' # 指定master 端口号
os.environ['MASTER_PORT'] = '7856' # 指定master 端口号
#########################################################
#           Define Model Parallel ResNet50              #
#########################################################
# In order to split the ResNet50 and place it on two different workers, we
# implement it in two model shards. The ResNetBase class defines common
# attributes and methods shared by two shards. ResNetShard1 and ResNetShard2
# contain two partitions of the model layers respectively.
num_classes = 1000
def conv1x1(in_planes, out_planes, stride=1):
    """1x1 convolution"""
    return nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
class ResNetBase(nn.Module):
    def __init__(self, block, inplanes, num_classes=1000,
                 groups=1, width_per_group=64, norm_layer=None):
        super(ResNetBase, self).__init__()
        self._lock = threading.Lock()
        self._block = block
        self._norm_layer = nn.BatchNorm2d
        self.inplanes = inplanes
        self.dilation = 1
        self.groups = groups
        self.base_width = width_per_group
    def _make_layer(self, planes, blocks, stride=1):
        norm_layer = self._norm_layer
        downsample = None
        previous_dilation = self.dilation
        if stride != 1 or self.inplanes != planes * self._block.expansion:
            downsample = nn.Sequential(
                conv1x1(self.inplanes, planes * self._block.expansion, stride),
                norm_layer(planes * self._block.expansion),
            )
        layers = []
        layers.append(self._block(self.inplanes, planes, stride, downsample, self.groups,
                                  self.base_width, previous_dilation, norm_layer))
        self.inplanes = planes * self._block.expansion
        for _ in range(1, blocks):
            layers.append(self._block(self.inplanes, planes, groups=self.groups,
                                      base_width=self.base_width, dilation=self.dilation,
                                      norm_layer=norm_layer))
        return nn.Sequential(*layers)
    def parameter_rrefs(self):
        r"""
        Create one RRef for each parameter in the given local module, and return a
        list of RRefs.
        """
        return [RRef(p) for p in self.parameters()]
class ResNetShard2(ResNetBase):
    """
    The second part of ResNet.
    """
    def __init__(self, device, *args, **kwargs):
        super(ResNetShard2, self).__init__(
            Bottleneck, 512, num_classes=num_classes, *args, **kwargs)
        self.device = device
        self.seq = nn.Sequential(
            self._make_layer(256, 6, stride=2),
            self._make_layer(512, 3, stride=2),
            nn.AdaptiveAvgPool2d((1, 1)),
        ).to(self.device)
        self.fc =  nn.Linear(512 * self._block.expansion, num_classes).to(self.device)
    def forward(self, x_rref):
        x = x_rref.to_here().to(self.device)
        print(x)
        with self._lock:
            out = self.fc(torch.flatten(self.seq(x), 1))
        return out.cpu()
#########################################################
#                   Run RPC Processes                   #
#########################################################
if __name__=="__main__":
    options = rpc.TensorPipeRpcBackendOptions(num_worker_threads=256, rpc_timeout=300)
    # 初始化工作节点的RPC连接
    rpc.init_rpc("worker", rank=1, world_size=2, rpc_backend_options=options)
    # 等待主节点的调用
    rpc.shutdown()

代码中的MASTER_ADDR和port需要指定为一致,分别在master机器上运行master.py,worker机器上运行worker.py,这样就可以实现Resnet 50 model在两台物理机器上model parallel。
注意事项

  • 确保物理机器能够互相ping通,同时关闭防火墙
  • 两个物理机器最好都是linux环境,我们的实验发现pytorch的分布式不支持在Windows环境运行
  • 两个物理机器的python运行环境要求保持一致

到此这篇关于pytorch rpc如何实现分物理机器实现model parallel的文章就介绍到这了,更多相关pytorch rpc实现model parallel内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • PyTorch读取Cifar数据集并显示图片的实例讲解

    PyTorch读取Cifar数据集并显示图片的实例讲解

    今天小编就为大家分享一篇PyTorch读取Cifar数据集并显示图片的实例讲解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
    2018-07-07
  • Python数据类型之List列表实例详解

    Python数据类型之List列表实例详解

    这篇文章主要介绍了Python数据类型之List列表,结合实例形式分析了PythonList列表的概念、功能、定义以及判断、截取、遍历、切片等常见操作技巧,需要的朋友可以参考下
    2019-05-05
  • 如何写好 Python 的 Lambda 函数

    如何写好 Python 的 Lambda 函数

    这篇文章主要介绍了如何写好 Python 的 Lambda 函数,Lambda 函数是 Python 中的匿名函数,下面文章通过介绍Lambda 函数的相关内容展开文章主题,需要的小伙伴可以参考一下
    2022-03-03
  • node.js获取参数的常用方法(总结)

    node.js获取参数的常用方法(总结)

    下面小编就为大家带来一篇node.js获取参数的常用方法(总结)。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • python攻防-破解附近局域网WIFI密码实现上网自由

    python攻防-破解附近局域网WIFI密码实现上网自由

    本文将记录学习如何通过 Python 脚本实破解附近局域网 WIFI 密码的暴力破解,随时随地免费蹭网,再也不被WiFi密码困扰,实现蹭网自由
    2021-08-08
  • python win32 简单操作方法

    python win32 简单操作方法

    下面小编就为大家带来一篇python win32 简单操作方法。小编觉得挺不错的,现在就分享给大家,也给大家做个参考。一起跟随小编过来看看吧
    2017-05-05
  • 使用python连接Linux服务器发送指定命令的示例代码

    使用python连接Linux服务器发送指定命令的示例代码

    这篇文章主要介绍了使用python连接Linux服务器发送指定命令,首先安装paramiko库,使用paramiko库连接linux,使用paramiko库上传下载文件,结合示例代码给大家介绍的非常详细,需要的朋友可以参考下
    2023-10-10
  • Python Flask全栈项目实战构建在线书店流程

    Python Flask全栈项目实战构建在线书店流程

    这篇文章主要为大家介绍了Python Flask全流程全栈项目实战之在线书店构建实现过程,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
    2023-11-11
  • Python接口自动化浅析登录接口测试实战

    Python接口自动化浅析登录接口测试实战

    本文主要接好了python接口自动化的接口概念、接口用例设计及登录,跟随本文章来进行一个接口用例设计及登录接口测试实战,有需要的朋友可以参考下
    2021-08-08
  • python中小数点后的位数问题

    python中小数点后的位数问题

    这篇文章主要介绍了python中小数点后的位数问题,具有很好的参考价值,希望对大家有所帮助。如有错误或未考虑完全的地方,望不吝赐教
    2023-03-03

最新评论