PyTorch 使用 DistributedDataParallel 进行分布式训练教程

PyTorch 进行分布式训练,可以使用 nn.DataParallelnn.parallel.DistributedDataParallel 两种方式嘛,但 DistributedDataParallel 拥有天然的优势,能够保证显存分配均衡,而且支持多机多卡分布式训练。

并行处理机制

DistributedDataParallel 每个进程对应一个独立的训练过程,与其他并行进程之间只进行少量的信息交换。

  1. 各进程的模型进行初始化时执行一次 broadcast 保证各进程模型初始参数一致;
  2. 每个进程读取不同的数据进行前向传播,并计算得到 loss 和梯度;
  3. 将梯度汇总并求平均,broadcast 到其他进程;
  4. 各个进程执行完整的反向传播更新参数。

由于各个进程模型初始化相同、执行相同的梯度更新,所以各个进程的模型参数永远是相同的。

使用流程

代码修改

--local_rank 参数配置

在主训练代码中必须解析 --local_rank 参数,该参数用于指定当前设备上的进程编号,使用 torch/distributed/launch.py 将会自动传入该参数。

1
2
3
4
5
6
import argparse

parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", default=-1, type=int,
help="Local rank for distributed training.")
args = parser.parse_args()

初始化后端

PyTorch 支持 NCCL 作为后端,默认使用该后端进行通信即可。可使用 torch.distributed.is_nccl_available() 检测 NCCL 后端是否可用。

目前有三种初始化方法供选择:

TCP

需要在代码中手动配置 rank 0 的地址以及当前的 rank 编号。

1
2
3
4
5
import torch.distributed as dist

# Use address of one of the machines
dist.init_process_group(backend, init_method='tcp://10.1.1.20:12024',
rank=args.rank, world_size=4)

共享文件系统

请见 https://pytorch.org/docs/stable/distributed.html#shared-file-system-initialization

1
2
3
4
5
import torch.distributed as dist

# rank should always be specified
dist.init_process_group(backend, init_method='file:///mnt/nfs/sharedfile',
world_size=4, rank=args.rank)

环境变量

该方法最简便,因此也最常用。

代码中只需写入:

1
2
3
import torch.distributed as dist

dist.init_process_group(backend, init_method="env://")

需要配置四个环境变量:

  1. MASTER_ADDR:rank 0 机器的地址;
  2. MASTER_PORT:rank 0 机器的端口;
  3. WORLD_SIZE:机器总数;
  4. RANK:当前机器 rank 号。

如果使用 torch/distributed/launch.py,则可以直接使用该脚本的参数传入这些环境变量,无需单独配置。

DataLoader 配置

使用 DistributedSampler 对数据集进行划分,使得每个进程只处理原始数据的一个子集。

1
2
3
4
from torch.utils.data.distributed import DistributedSampler

sampler = DistributedSampler(dataset) if is_distributed else None
loader = DataLoader(dataset, batch_size=args.batch_size, shuffle=(sampler is None), sampler=sampler)

注:DDPDataLoaderbatch_size 只需设置为单卡使用的即可,因为每个进程保持一个完整的读取数据、前向传播、反向传播过程。

配置模型

使用 DistributedDataParallel 包装模型,它帮助我们为不同的 GPU 上得到的梯度信息进行 all reduce,保证各个模型使用相同的参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from torch.nn.parallel import DistributedDataParallel as DDP

# 设置当前使用的 device
torch.cuda.set_device(args.local_rank)
args.device = torch.device('cuda', args.local_rank)

def set_seed(seed):
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

set_seed(20200716) # 保证各个进程的模型初始化时参数保持一致
model = MyModel()
model.to(args.device)
model = DDP(model, device_ids=[args.local_rank], output_device=args.local_rank)

在主进程保存模型

1
2
3
4
5
6
7
8
if args.local_rank == 0:
# All processes should see same parameters as they all start from same
# random parameters and gradients are synchronized in backward passes.
# Therefore, saving it in one process is sufficient.
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

# 也可以只在主进程打印 log
logging.info("...")

运行

使用 torch/distributed/launch.py 脚本执行 DDP 可以方便很多。具体用法请参见 launch.py

单节点多进程分布式训练例子

1
2
3
4
5
CUDA_VISIBLE_DEVICES=1,2,3 python -m torch.distributed.launch \
--nproc_per_node=3 \ # 所拥有的 GPU 数
./train.py \
--arg1 1 \
--arg2 2

注:如果想同一个节点运行多个分布式训练的,需要指定不同的 --master_port 通信端口,保证互不干扰。

多节点多进程分布式训练例子

节点 0(192.168.1.1:1234)执行:

1
2
3
4
5
6
7
8
9
CUDA_VISIBLE_DEVICES=1,2,3 python -m torch.distributed.launch \
--nproc_per_node=3 \ # 单节点所拥有的 GPU 数
--nnodes=2 \ # 总节点数
--node_rank=0 \ # 节点编号,0 表示主节点
--master_add="192.168.1.1" \ # 主节点地址
--master_port=1234 \ # 主节点端口
./train.py \
--arg1 1 \
--arg2 2

节点 1 执行:

1
2
3
4
5
6
7
8
9
CUDA_VISIBLE_DEVICES=0,1,2 python -m torch.distributed.launch \
--nproc_per_node=3 \ # 单节点所拥有的 GPU 数
--nnodes=2 \ # 总节点数
--node_rank=1 \ # 节点编号,1 表示第二个节点
--master_add="192.168.1.1" \ # 主节点地址
--master_port=1234 \ # 主节点端口
./train.py \
--arg1 1 \
--arg2 2

参考

https://www.cnblogs.com/yh-blog/p/12877922.html#autoid-2-2-0