快速开始-数据并行
数据并行(data parallelism)是大规模深度学习训练中常用的并行模式,它会在每个进程(设备)中维护完整的模型和参数, 但在每个进程上处理不同的数据。因此,数据并行非常适合单卡已经能够放得下完整的模型和参数,但希望通过并行来增大 全局数据(global batch)大小来提升训练的吞吐量。
本节将采用自定义卷积网络和 Paddle 内置的 CIFAR-10 数据集来介绍如何使用 paddle.distributed (paddle.distributed.fleet) 进行数据并行训练。
1.1 版本要求
在编写分布式训练程序之前,用户需要确保已经安装 GPU 版的 PaddlePaddle 2.3.0 及以上版本。
1.2 具体步骤
与单机单卡的普通模型训练相比,数据并行训练只需要按照如下 5 个步骤对代码进行简单调整即可:
导入分布式训练依赖包
初始化 Fleet 环境
构建分布式训练使用的网络模型
构建分布式训练使用的优化器
构建分布式训练使用的数据加载器
下面将逐一进行讲解。
1.2.1 导入分布式训练依赖包
导入飞桨分布式训练专用包 Fleet。
# 导入分布式专用 Fleet API
from paddle.distributed import fleet
# 导入分布式训练数据所需 API
from paddle.io import DataLoader, DistributedBatchSampler
# 设置 GPU 环境
paddle.set_device('gpu')
          1.2.2 初始化 Fleet 环境
分布式初始化需要:
设置 is_collective 为 True,表示分布式训练采用 Collective 模式。
[可选] 设置分布式策略 cn_api_distributed_fleet_DistributedStrategy ,跳过将使用缺省配置。
# 选择不设置分布式策略
fleet.init(is_collective=True)
# 选择设置分布式策略
strategy = fleet.DistributedStrategy()
fleet.init(is_collective=True, strategy=strategy)
          1.2.3 构建分布式训练使用的网络模型
只需要使用 fleet.distributed_model 对原始串行网络模型进行封装。
# 等号右边 model 为原始串行网络模型
model = fleet.distributed_model(model)
          1.2.4 构建分布式训练使用的优化器
只需要使用 fleet.distributed_optimizer 对原始串行优化器进行封装。
# 等号右边 optimizer 为原始串行网络模型
optimizer = fleet.distributed_optimizer(optimizer)
          1.2.5 构建分布式训练使用的数据加载器
由于分布式训练过程中每个进程可能读取不同数据,所以需要对数据集进行合理拆分后再进行加载。这里只需要在构建 cn_api_fluid_io_DataLoader 时, 设置分布式数据采样器 cn_api_io_cn_DistributedBatchSampler 即可。
# 构建分布式数据采样器
# 注意:需要保证 batch 中每个样本数据 shape 相同,若原尺寸不一,需进行预处理
train_sampler = DistributedBatchSampler(train_dataset, 32, shuffle=True, drop_last=True)
val_sampler = DistributedBatchSampler(val_dataset, 32)
# 构建分布式数据加载器
train_loader = DataLoader(train_dataset, batch_sampler=train_sampler, num_workers=2)
valid_loader = DataLoader(val_dataset, batch_sampler=val_sampler, num_workers=2)
          1.3 完整示例代码
# -*- coding: UTF-8 -*-
import numpy as np
import matplotlib.pyplot as plt
import paddle
import paddle.nn.functional as F
from paddle.vision.transforms import ToTensor
# 一、导入分布式专用 Fleet API
from paddle.distributed import fleet
# 构建分布式数据加载器所需 API
from paddle.io import DataLoader, DistributedBatchSampler
# 设置 GPU 环境
paddle.set_device('gpu')
class MyNet(paddle.nn.Layer):
    def __init__(self, num_classes=1):
        super().__init__()
        self.conv1 = paddle.nn.Conv2D(in_channels=3, out_channels=32, kernel_size=(3, 3))
        self.pool1 = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
        self.conv2 = paddle.nn.Conv2D(in_channels=32, out_channels=64, kernel_size=(3,3))
        self.pool2 = paddle.nn.MaxPool2D(kernel_size=2, stride=2)
        self.conv3 = paddle.nn.Conv2D(in_channels=64, out_channels=64, kernel_size=(3,3))
        self.flatten = paddle.nn.Flatten()
        self.linear1 = paddle.nn.Linear(in_features=1024, out_features=64)
        self.linear2 = paddle.nn.Linear(in_features=64, out_features=num_classes)
    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(x)
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(x)
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(x)
        x = self.flatten(x)
        x = self.linear1(x)
        x = F.relu(x)
        x = self.linear2(x)
        return x
epoch_num = 10
batch_size = 32
learning_rate = 0.001
val_acc_history = []
val_loss_history = []
def train():
    # 二、初始化 Fleet 环境
    fleet.init(is_collective=True)
    model = MyNet(num_classes=10)
    # 三、构建分布式训练使用的网络模型
    model = fleet.distributed_model(model)
    opt = paddle.optimizer.Adam(learning_rate=learning_rate,parameters=model.parameters())
    # 四、构建分布式训练使用的优化器
    opt = fleet.distributed_optimizer(opt)
    transform = ToTensor()
    cifar10_train = paddle.vision.datasets.Cifar10(mode='train',
                                           transform=transform)
    cifar10_test = paddle.vision.datasets.Cifar10(mode='test',
                                          transform=transform)
    # 五、构建分布式训练使用的数据集
    train_sampler = DistributedBatchSampler(cifar10_train, 32, shuffle=True, drop_last=True)
    train_loader = DataLoader(cifar10_train, batch_sampler=train_sampler, num_workers=2)
    valid_sampler = DistributedBatchSampler(cifar10_test, 32, drop_last=True)
    valid_loader = DataLoader(cifar10_test, batch_sampler=valid_sampler, num_workers=2)
    for epoch in range(epoch_num):
        model.train()
        for batch_id, data in enumerate(train_loader()):
            x_data = data[0]
            y_data = paddle.to_tensor(data[1])
            y_data = paddle.unsqueeze(y_data, 1)
            logits = model(x_data)
            loss = F.cross_entropy(logits, y_data)
            if batch_id % 1000 == 0:
                print("epoch: {}, batch_id: {}, loss is: {}".format(epoch, batch_id, loss.numpy()))
            loss.backward()
            opt.step()
            opt.clear_grad()
        model.eval()
        accuracies = []
        losses = []
        for batch_id, data in enumerate(valid_loader()):
            x_data = data[0]
            y_data = paddle.to_tensor(data[1])
            y_data = paddle.unsqueeze(y_data, 1)
            logits = model(x_data)
            loss = F.cross_entropy(logits, y_data)
            acc = paddle.metric.accuracy(logits, y_data)
            accuracies.append(acc.numpy())
            losses.append(loss.numpy())
        avg_acc, avg_loss = np.mean(accuracies), np.mean(losses)
        print("[validation] accuracy/loss: {}/{}".format(avg_acc, avg_loss))
        val_acc_history.append(avg_acc)
        val_loss_history.append(avg_loss)
if __name__ == "__main__":
    train()
         1.4 分布式启动
准备好分布式训练脚本后,就可以通过 paddle.distributed.launch 在集群上启动分布式训练:
- 
          - 单机多卡训练
- 
            假设只使用集群的一个节点,节点上可使用的 GPU 卡数为 4,那么只需要在节点终端运行如下命令: python -m paddle.distributed.launch --gpus=0,1,2,3 train_with_fleet.py 
 
- 
          - 多机多卡训练
- 
            假设集群包含两个节点,每个节点上可使用的 GPU 卡数为 4,IP 地址分别为 192.168.1.2 和 192.168.1.3,那么需要在两个节点的终端上分别运行如下命令: 在 192.168.1.2 节点运行: python -m paddle.distributed.launch \ --gpus=0,1,2,3 \ --ips=192.168.1.2,192.168.1.3 \ train_with_fleet.py 在 192.168.1.3 节点运行相同命令: python -m paddle.distributed.launch \ --gpus=0,1,2,3 \ --ips=192.168.1.2,192.168.1.3 \ train_with_fleet.py 
 
相关启动问题,可参考 cn_api_distributed_launch 。