石巖小學網(wǎng)站建設(shè)品牌推廣策劃方案案例
深度學習中的并行策略概述:2 Data Parallelism
數(shù)據(jù)并行(Data Parallelism)的核心在于將模型的數(shù)據(jù)處理過程并行化。具體來說,面對大規(guī)模數(shù)據(jù)批次時,將其拆分為較小的子批次,并在多個計算設(shè)備上同時進行處理。每個設(shè)備負責處理一個子批次,實現(xiàn)并行計算。處理完成后,將各個設(shè)備上的計算結(jié)果匯總,以便對模型進行統(tǒng)一更新。由于其在深度學習中的普遍應(yīng)用,數(shù)據(jù)并行成為了一種廣泛支持的并行計算策略,并在主流框架中得到了良好的實現(xiàn)。
以下代碼展示了如何在PyTorch中使用nn.DataParallel和DistributedDataParallel實現(xiàn)數(shù)據(jù)并行,以加速模型的訓練過程。
使用nn.DataParallel實現(xiàn)數(shù)據(jù)并行
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader# 假設(shè)我們有一個簡單的數(shù)據(jù)集類
class SimpleDataset(Dataset):def __init__(self, data, target):self.data = dataself.target = targetdef __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx], self.target[idx]# 假設(shè)我們有一個簡單的神經(jīng)網(wǎng)絡(luò)模型
class SimpleModel(nn.Module):def __init__(self, input_dim):super(SimpleModel, self).__init__()self.fc = nn.Linear(input_dim, 1)def forward(self, x):return torch.sigmoid(self.fc(x))# 假設(shè)我們有一些數(shù)據(jù)
n_sample = 100
n_dim = 10
batch_size = 10
X = torch.randn(n_sample, n_dim)
Y = torch.randint(0, 2, (n_sample,)).float()
dataset = SimpleDataset(X, Y)
data_loader = DataLoader(dataset, batch_size=batch_size, shuffle=True)# 初始化模型
device_ids = [0, 1, 2] # 指定使用的GPU編號
model = SimpleModel(n_dim).to(device_ids[0])
model = nn.DataParallel(model, device_ids=device_ids)# 定義優(yōu)化器和損失函數(shù)
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)
criterion = nn.BCELoss()# 訓練模型
for epoch in range(10):for batch_idx, (inputs, targets) in enumerate(data_loader):inputs, targets = inputs.to('cuda'), targets.to('cuda')outputs = model(inputs)loss = criterion(outputs, targets.unsqueeze(1))optimizer.zero_grad()loss.backward()optimizer.step()print(f'Epoch {epoch}, Batch {batch_idx}, Loss: {loss.item()}')
使用DistributedDataParallel實現(xiàn)數(shù)據(jù)并行
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP# 假設(shè)我們有一個簡單的數(shù)據(jù)集類
class SimpleDataset(Dataset):def __init__(self, data, target):self.data = dataself.target = targetdef __len__(self):return len(self.data)def __getitem__(self, idx):return self.data[idx], self.target[idx]# 假設(shè)我們有一個簡單的神經(jīng)網(wǎng)絡(luò)模型
class SimpleModel(nn.Module):def __init__(self, input_dim):super(SimpleModel, self).__init__()self.fc = nn.Linear(input_dim, 1)def forward(self, x):return torch.sigmoid(self.fc(x))# 初始化進程組
def init_process(rank, world_size, backend='nccl'):dist.init_process_group(backend, rank=rank, world_size=world_size)# 訓練函數(shù)
def train(rank, world_size):init_process(rank, world_size)torch.cuda.set_device(rank)model = SimpleModel(10).to(rank)model = DDP(model, device_ids=[rank])dataset = SimpleDataset(torch.randn(100, 10), torch.randint(0, 2, (100,)).float())sampler = torch.utils.data.distributed.DistributedSampler(dataset, num_replicas=world_size, rank=rank)data_loader = DataLoader(dataset, batch_size=10, sampler=sampler)optimizer = optim.SGD(model.parameters(), lr=0.01)criterion = nn.BCELoss()for epoch in range(10):for inputs, targets in data_loader:inputs, targets = inputs.to(rank), targets.to(rank)optimizer.zero_grad()outputs = model(inputs)loss = criterion(outputs, targets.unsqueeze(1))loss.backward()optimizer.step()if __name__ == "__main__":world_size = 4torch.multiprocessing.spawn(train, args=(world_size,), nprocs=world_size, join=True)