多GPU训练

多GPU训练的几种方式

多GPU训练的几种方式

数据并行

将同一个模型分别放到多个GPU上,每个GPU都使用一个小批量样本。

数据并行

一般来说,\(k\)个GPU并行训练过程如下:

  • 在任何一次训练迭代中,给定的随机的小批量样本都将被分成\(k\)个部分,并均匀地分配到GPU上
  • 每个GPU根据分配给它的小批量子集,计算模型参数的损失和梯度
  • \(k\)个GPU中的局部梯度聚合(因为不同GPU上的数据无法进行运算,需要先聚合到同一个设备上),以获得当前小批量的随机梯度
  • 聚合梯度被重新分发到每个GPU中
  • 每个GPU使用这个小批量随机梯度,来更新它所维护的完整的模型参数集

当在\(k\)个GPU上训练时,需要扩大小批量的大小为\(k\)的倍数,这样每个GPU都有相同的工作量,就像只在单个GPU上训练一样。

相比于单卡训练,多卡训练可以相应的提高学习率:

线性缩放法则(Linear Scaling Rule):当批量大小增加时,学习率也应该线性增加。

当批量大小增加时,为了保持相同的收敛速度和稳定性,学习率应该线性增加。这是因为较大的批量提供了更准确的梯度估计,从而允许更大的更新步长。

线性缩放法则的目的是在批量大小增加的情况下,保持训练过程中的动态相似性。这意味着,我们希望在较大的批量下以类似的速度和方式收敛,就像在较小的批量下一样。为了实现这一点,学习率需要以批量大小成比例增加。

线性缩放法则主要适用于批量大小在一定程度内的增加。当批量大小非常大时,可能需要考虑其他缩放策略或调整其他超参数。

需要注意的是,由于在同一次epoch中不同的GPU上的模型的小批量数据不同,而BatchNormalization在推理中使用的均值和方差是在训练时对每个批量的数据采用移动平均法得出来的,那么这就会造成不同GPU上的BatchNorm层的用于推理的均值和方法不同。对此有几种处理方法:

  • 同步BatchNormalization:PyTorch提供了torch.nn.SyncBatchNorm,这是一个特殊的BN层,可以在多个GPU之间同步均值和方差:

    1
    2
    3
    4
    5
    6
    7
    8
    import torch
    import torch.nn as nn
    import torch.distributed as dist

    # 假设已经初始化了分布式环境
    model = MyModel()
    model = nn.SyncBatchNorm.convert_sync_batchnorm(model)
    model = nn.parallel.DistributedDataParallel(model)

    需要注意的是,SyncBatchNorm可能会引入额外的通信开销,影响训练速度

  • 如果每块GPU上的batch_size比较大,那么每个GPU上的BN层计算的均值和方差可以认为是对整体数据分布的一个不错的估计。在这种情况下,使用第一块GPU的BN统计数据来代替整体是一个可行的简化方案

数据并行

DataParallel

torch.nn.DataParallel是PyTorch提供的一个简单易用的多GPU并行训练工具。它将数据分批分配到多个GPU上,实现了模型的并行计算。

DataParallel主要适用于单机多卡的情况。它可以在同一台机器的多个GPU之间分配数据和工作负载,实现模型的并行训练。当我们在模型上调用forward方法时,DataParallel会自动将输入数据分割成多个小批次,并将它们发送到不同的GPU上进行计算。计算完成后,它会收集并合并每个GPU的结果(如损失和梯度),以更新模型的参数。

初始化模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader


model = nn.Sequential(
nn.Linear(1, 10),
nn.Sigmoid(),
nn.Linear(10, 1)
)

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
model.to(device)

optimizer = optim.Adam(model.parameters())
loss_function = nn.CrossEntropyLoss()

# 使用torch.cuda.device_count()判定本机中GPU的个数
if torch.cuda.device_count() > 1:
model = nn.DataParallel(model)

需要注意的是,我们需要在使用model = nn.DataParallel(model)之前完成整个模型的构建和初始化,包括模型参数的初始化,防止出现一些错误比如model的一部分参数在第一个GPU中,另一部分参数在第二个GPU中。

我们在构建模型时可以将模型全部放于第一个GPUcuda:0上,然后使用nn.DataParallel()会自动将模型分到多个GPU上。

模型训练

在训练循环中,我们只需要像往常一样进行前向传播、计算损失、反向传播和优化器步骤。DataParallel会自动处理数据的分割和结果的合并。

我们应该将样本数据放于与model初始设备相同的设备上,前向传播时,PyTorch会自动将这些数据分配到各个GPU上

1
2
3
4
5
6
7
8
for epoch in range(num_epochs):
for X, y in train_loader:
X, y = X.to(device), y.to(device)
optimizer.zero_grad()
y_hat = model(X)
loss = loss_function(y_hat, y)
loss.backward()
optimizer.step()

使用多GPU训练无法实现\(1+1=2\)的效果,因为GPU之间的通信有延时,如果GPU之间进行过多的数据传输,反而可能会降低性能

实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
from d2l import torch as d2l
import time

class Bottleneck(nn.Module):
def __init__(self, in_channels, out_channels, resnet = True):
super().__init__()
self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False)
self.bn1 = nn.BatchNorm2d(out_channels)
self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
self.bn2 = nn.BatchNorm2d(out_channels)
self.conv3 = nn.Conv2d(out_channels, out_channels, kernel_size=1, bias=False)
self.bn3 = nn.BatchNorm2d(out_channels)
self.relu = nn.ReLU(inplace=True)
self.identity = nn.Conv2d(in_channels, out_channels, kernel_size=1)
self.resnet = resnet

def forward(self, x):
identity = x
out = self.relu(self.bn1(self.conv1(x)))
out = self.relu(self.bn2(self.conv2(out)))
out = self.bn3(self.conv3(out))

if self.resnet:
identity = self.identity(x)
out += identity
return self.relu(out)

class Net(nn.Module):
def __init__(self, layers, num_classes=10, resnet=True):
super().__init__()

self.conv1 = nn.Conv2d(3, 64, kernel_size=7, stride=1, padding=3, bias=False)
self.bn1 = nn.BatchNorm2d(64)
self.relu = nn.ReLU(inplace=True)
self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
self.layers1 = self._make_layers(layers[0], 64, 128, resnet)
self.layers2 = self._make_layers(layers[1], 128, 256, resnet)
self.layers3 = self._make_layers(layers[2], 256, 512, resnet)
self.layers4 = self._make_layers(layers[3], 512, 512, resnet)
self.flatten = nn.Flatten()
self.avgpool = nn.AdaptiveAvgPool2d(output_size=(1, 1))
self.fc = nn.Linear(512, num_classes)




def _make_layers(self, blocks, in_channels, out_channels, resnet):
# blocks:该层中残差块的数量
layers = []
for _ in range(blocks):
layers.append(Bottleneck(in_channels, out_channels, resnet))
in_channels = out_channels
return nn.Sequential(*layers)



def forward(self, x):
x = self.relu(self.bn1(self.conv1(x)))
x = self.maxpool(x)
x = self.layers1(x)
# x = self.maxpool(x)
x = self.layers2(x)
x = self.maxpool(x)
x = self.layers3(x)
# x = self.maxpool(x)
x = self.layers4(x)
x = self.maxpool(x)

x = self.avgpool(x)
x = self.flatten(x)
x = self.fc(x)

return x


def train(data_loader, model, device, optimizer, loss_function):
model.train()
start_time = time.time()
num = len(data_loader.dataset)
L = 0
for x, y in data_loader:
x, y = x.to(device), y.to(device)
y_hat = model(x)
loss = loss_function(y_hat, y)
L += loss.item() * len(y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
return L / num, num / (time.time() - start_time)

def test(data_loader, model, device):
num = len(data_loader.dataset)
right_num = 0
model.eval()
with torch.no_grad():
for x, y in data_loader:
x, y = x.to(device), y.to(device)
y_hat = model(x).argmax(dim=1)
right_num += (y_hat == y).sum().item()
return right_num / num

transform_train = transforms.Compose([
transforms.RandomHorizontalFlip(),
transforms.RandomVerticalFlip(),
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])

])
transform_test = transforms.Compose([
transforms.ToTensor(),
transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])
train_data = datasets.CIFAR10(root='./data', transform=transform_train, train=True, download=False)
test_data = datasets.CIFAR10(root='./data', transform=transform_test, train=False, download=False)
train_loader = DataLoader(train_data, shuffle=True, batch_size=256, num_workers=4)
test_loader = DataLoader(test_data, shuffle=False, batch_size=256, num_workers=4)

model = Net([1, 2, 2, 2], num_classes=10, resnet=True)
device = torch.device('cuda:0')
def init_weight(m):
if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
nn.init.kaiming_normal_(m.weight)
model.apply(init_weight)
model.to(device)

optimizer = optim.Adam(model.parameters(), lr=0.001)
loss_function = nn.CrossEntropyLoss()

model = nn.SyncBatchNorm.convert_sync_batchnorm(model)
if torch.cuda.device_count() > 1:
model = nn.DataParallel(model)

num_epochs = 10
start_time = time.time()
for epoch in range(num_epochs):
loss, per_time = train(train_loader, model, device, optimizer, loss_function)
print(f'Epoch {epoch + 1}, Loss {loss:.5f}, {per_time:.2f} examples/sec, Test Acc {test(test_loader, model, device)*100:.2f}%')
print(f'用时:{time.time() - start_time}秒')

多GPU训练
https://blog.shinebook.net/2025/03/11/人工智能/pytorch/多GPU训练/
作者
X
发布于
2025年3月11日
许可协议