從 PyTorch DDP 到 Accelerate 再到 Trainer,輕鬆掌握分散式訓練

釋出於 2022 年 10 月 21 日
在 GitHub 上更新

概述

本教程假設您對 PyTorch 和如何訓練簡單模型有基本瞭解。它將透過分散式資料並行(DDP)過程,透過三個不同抽象級別的示例來展示在多個 GPU 上進行訓練

  • 透過 pytorch.distributed 模組實現的原生 PyTorch DDP
  • 利用 🤗 Accelerate 對 pytorch.distributed 進行的輕量級封裝,該封裝還確保程式碼可以在單個 GPU 和 TPU 上執行,無需任何程式碼更改,且對原始程式碼的修改最少
  • 利用 🤗 Transformer 的高階 Trainer API,它抽象了所有樣板程式碼,並支援各種裝置和分散式場景

什麼是“分散式”訓練,它為何重要?

以下是一些非常基本的 PyTorch 訓練程式碼,它根據 官方 MNIST 示例 設定並訓練一個基於 MNIST 的模型

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms

class BasicNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(1, 32, 3, 1)
        self.conv2 = nn.Conv2d(32, 64, 3, 1)
        self.dropout1 = nn.Dropout(0.25)
        self.dropout2 = nn.Dropout(0.5)
        self.fc1 = nn.Linear(9216, 128)
        self.fc2 = nn.Linear(128, 10)
        self.act = F.relu

    def forward(self, x):
        x = self.act(self.conv1(x))
        x = self.act(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = self.dropout1(x)
        x = torch.flatten(x, 1)
        x = self.act(self.fc1(x))
        x = self.dropout2(x)
        x = self.fc2(x)
        output = F.log_softmax(x, dim=1)
        return output

我們定義訓練裝置 (cuda)

device = "cuda"

構建一些 PyTorch DataLoaders

transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1307), (0.3081))
])

train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
test_dset = datasets.MNIST('data', train=False, transform=transform)

train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

將模型移動到 CUDA 裝置

model = BasicNet().to(device)

構建一個 PyTorch 最佳化器

optimizer = optim.AdamW(model.parameters(), lr=1e-3)

最後建立一個簡單的訓練和評估迴圈,對資料集執行一次完整的迭代並計算測試準確率

model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(device), target.to(device)
    output = model(data)
    loss = F.nll_loss(output, target)
    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

model.eval()
correct = 0
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(device), target.to(device)
        output = model(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

通常,從這裡開始,可以將所有這些程式碼放入 Python 指令碼或在 Jupyter Notebook 中執行。

然而,如果這些資源可用,您如何讓這個指令碼在兩個 GPU 或多臺機器上執行,這可以透過_分散式_訓練來提高訓練速度?簡單地執行 python myscript.py 只會使用單個 GPU 執行指令碼。這就是 torch.distributed 發揮作用的地方。

PyTorch 分散式資料並行

顧名思義,torch.distributed 旨在在_分散式_設定中工作。這可以包括多節點(您有多臺機器,每臺機器都帶有一個 GPU),或多 GPU(單個系統有多個 GPU),或兩者的某種組合。

要將我們的上述程式碼轉換為在分散式設定中工作,必須首先定義一些設定配置,詳細資訊請參見 DDP 入門教程

首先,必須宣告一個 setup 和一個 cleanup 函式。這將開啟一個處理組,所有計算程序都可以透過該組進行通訊

注意:對於本教程的這一部分,應假定這些函式是在 Python 指令碼檔案中傳送的。稍後將討論使用 Accelerate 的啟動器,它消除了這種必要性。

import os
import torch.distributed as dist

def setup(rank, world_size):
    "Sets up the process group and configuration for PyTorch Distributed Data Parallelism"
    os.environ["MASTER_ADDR"] = 'localhost'
    os.environ["MASTER_PORT"] = "12355"

    # Initialize the process group
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

def cleanup():
    "Cleans up the distributed environment"
    dist.destroy_process_group()

最後一個問題是_如何將我的資料和模型傳送到另一個 GPU?_

這就是 DistributedDataParallel 模組發揮作用的地方。它會將您的模型複製到每個 GPU 上,當呼叫 loss.backward() 時,將執行反向傳播,並且所有這些模型副本上的結果梯度將被平均/歸約。這確保了在最佳化器步驟之後每個裝置都具有相同的權重。

下面是我們訓練設定的示例,它被重構為一個具有此功能的函式

注意:這裡的 rank 是當前 GPU 相對於所有可用 GPU 的總 rank,這意味著它們的 rank 為 0 -> n-1

from torch.nn.parallel import DistributedDataParallel as DDP

def train(model, rank, world_size):
    setup(rank, world_size)
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)
    # Train for one epoch
    ddp_model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(rank), target.to(rank)
        output = model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    cleanup()

最佳化器需要根據特定裝置上的模型(即 ddp_model 而不是 model)進行宣告,以便正確計算所有梯度。

最後,為了執行指令碼,PyTorch 有一個方便的 torchrun 命令列模組可以提供幫助。只需傳入它應該使用的節點數量以及要執行的指令碼即可

torchrun --nproc_per_node=2 --nnodes=1 example_script.py

上述命令將在一臺機器上的兩個 GPU 上執行訓練指令碼,這是僅使用 PyTorch 執行分散式訓練的最低要求。

現在讓我們來談談 Accelerate,一個旨在使這個過程更無縫並幫助實現一些最佳實踐的庫

🤗 Accelerate

Accelerate 是一個旨在讓您執行我們剛剛上面所做的事情而無需大幅修改程式碼的庫。除此之外,Accelerate 內建的資料管道還可以提高您的程式碼效能。

首先,讓我們將上面執行的所有程式碼封裝到一個函式中,以幫助我們直觀地看到差異

def train_ddp(rank, world_size):
    setup(rank, world_size)
    # Build DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Build model
    model = model.to(rank)
    ddp_model = DDP(model, device_ids=[rank])

    # Build optimizer
    optimizer = optim.AdamW(ddp_model.parameters(), lr=1e-3)

    # Train for a single epoch
    ddp_model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(rank), target.to(rank)
        output = ddp_model(data)
        loss = F.nll_loss(output, target)
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(rank), target.to(rank)
            output = ddp_model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

接下來,我們談談 Accelerate 如何提供幫助。上面的程式碼存在一些問題

  1. 這略微低效,因為會根據每個裝置建立並推送 n 個數據載入器。
  2. 此程式碼將**僅**適用於多 GPU,因此需要進行特殊處理才能再次在單節點或 TPU 上執行。

Accelerate 透過 Accelerator 類解決了這個問題。透過它,與單節點和多節點的程式碼比較時,程式碼基本保持不變,除了三行程式碼,如下所示

def train_ddp_accelerate():
    accelerator = Accelerator()
    # Build DataLoaders
    transform = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307), (0.3081))
    ])

    train_dset = datasets.MNIST('data', train=True, download=True, transform=transform)
    test_dset = datasets.MNIST('data', train=False, transform=transform)

    train_loader = torch.utils.data.DataLoader(train_dset, shuffle=True, batch_size=64)
    test_loader = torch.utils.data.DataLoader(test_dset, shuffle=False, batch_size=64)

    # Build model
    model = BasicNet()

    # Build optimizer
    optimizer = optim.AdamW(model.parameters(), lr=1e-3)

    # Send everything through `accelerator.prepare`
    train_loader, test_loader, model, optimizer = accelerator.prepare(
        train_loader, test_loader, model, optimizer
    )

    # Train for a single epoch
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        output = model(data)
        loss = F.nll_loss(output, target)
        accelerator.backward(loss)
        optimizer.step()
        optimizer.zero_grad()
    
    # Evaluate
    model.eval()
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    print(f'Accuracy: {100. * correct / len(test_loader.dataset)}')

有了它,您的 PyTorch 訓練迴圈現在可以藉助 Accelerator 物件在任何分散式設定中執行。然後,此程式碼仍然可以透過 torchrun CLI 或透過 Accelerate 自己的 CLI 介面 accelerate launch 啟動。

因此,使用 Accelerate 進行分散式訓練變得輕而易舉,並且可以儘可能多地保持 PyTorch 骨架程式碼不變。

前面提到過,Accelerate 還可以提高 DataLoaders 的效率。這是透過自定義 Sampler 實現的,它可以在訓練期間自動將批次的部分發送到不同的裝置,從而允許一次只知道一份資料副本,而不是根據配置一次將四份資料副本載入到記憶體中。此外,記憶體中總共只有一份原始資料集的完整副本。此資料集的子集在所有用於訓練的節點之間進行拆分,從而允許在單個例項上訓練更大的資料集,而不會導致記憶體使用量爆炸式增長。

使用 notebook_launcher

前面提到過,您可以直接從 Jupyter Notebook 中啟動分散式程式碼。這來自 Accelerate 的 notebook_launcher 實用程式,它允許根據 Jupyter Notebook 中的程式碼啟動多 GPU 訓練。

使用它就像匯入啟動器一樣簡單

from accelerate import notebook_launcher

並將我們之前宣告的訓練函式、要傳遞的任何引數以及要使用的程序數(例如 TPU 上的 8 個或兩個 GPU 上的 2 個)傳遞進去。上述兩個訓練函式都可以執行,但請注意,在您啟動一次之後,例項需要重新啟動才能再次生成。

notebook_launcher(train_ddp, args=(), num_processes=2)

notebook_launcher(train_ddp_accelerate, args=(), num_processes=2)

使用 🤗 Trainer

最後,我們來到了最高級別的 API -- Hugging Face Trainer

這儘可能地封裝了訓練過程,同時仍然能夠在分散式系統上進行訓練,而使用者無需進行任何操作。

首先我們需要匯入 Trainer

from transformers import Trainer

然後我們定義一些 TrainingArguments 來控制所有常用的超引數。Trainer 也透過字典工作,因此需要建立一個自定義的 collate 函式。

最後,我們對 Trainer 進行子類化並編寫我們自己的 compute_loss

之後,這段程式碼也將在分散式設定中工作,無需編寫任何訓練程式碼!

from transformers import Trainer, TrainingArguments

model = BasicNet()

training_args = TrainingArguments(
    "basic-trainer",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    num_train_epochs=1,
    evaluation_strategy="epoch",
    remove_unused_columns=False
)

def collate_fn(examples):
    pixel_values = torch.stack([example[0] for example in examples])
    labels = torch.tensor([example[1] for example in examples])
    return {"x":pixel_values, "labels":labels}

class MyTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False):
        outputs = model(inputs["x"])
        target = inputs["labels"]
        loss = F.nll_loss(outputs, target)
        return (loss, outputs) if return_outputs else loss

trainer = MyTrainer(
    model,
    training_args,
    train_dataset=train_dset,
    eval_dataset=test_dset,
    data_collator=collate_fn,
)
trainer.train()
    ***** Running training *****
      Num examples = 60000
      Num Epochs = 1
      Instantaneous batch size per device = 64
      Total train batch size (w. parallel, distributed & accumulation) = 64
      Gradient Accumulation steps = 1
      Total optimization steps = 938
輪次 訓練損失 驗證損失
1 0.875700 0.282633

與上面使用 notebook_launcher 的示例類似,這裡也可以透過將其全部放入訓練函式中來實現。

def train_trainer_ddp():
    model = BasicNet()

    training_args = TrainingArguments(
        "basic-trainer",
        per_device_train_batch_size=64,
        per_device_eval_batch_size=64,
        num_train_epochs=1,
        evaluation_strategy="epoch",
        remove_unused_columns=False
    )

    def collate_fn(examples):
        pixel_values = torch.stack([example[0] for example in examples])
        labels = torch.tensor([example[1] for example in examples])
        return {"x":pixel_values, "labels":labels}

    class MyTrainer(Trainer):
        def compute_loss(self, model, inputs, return_outputs=False):
            outputs = model(inputs["x"])
            target = inputs["labels"]
            loss = F.nll_loss(outputs, target)
            return (loss, outputs) if return_outputs else loss

    trainer = MyTrainer(
        model,
        training_args,
        train_dataset=train_dset,
        eval_dataset=test_dset,
        data_collator=collate_fn,
    )

    trainer.train()

notebook_launcher(train_trainer_ddp, args=(), num_processes=2)

資源

要了解更多關於 PyTorch 分散式資料並行,請檢視這裡的文件 這裡

要了解更多關於 🤗 Accelerate 的資訊,請檢視這裡的文件 這裡

要了解更多關於 🤗 Transformers 的資訊,請檢視這裡的文件 這裡

社群

註冊登入 發表評論

© . This site is unofficial and not affiliated with Hugging Face, Inc.