Accelerate 文件

使用 Accelerate 執行梯度累積

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

使用 Accelerate 執行梯度累積

梯度累積是一種技術,它能讓你在機器記憶體無法容納的情況下,使用更大的批次大小進行訓練。這是透過在多個批次上累積梯度,並且僅在執行一定數量的批次後才更新最佳化器來實現的。

雖然從技術上講,標準的梯度累積程式碼在分散式設定中也能正常工作,但這並不是最高效的方法,你可能會遇到明顯的減速!

在本教程中,你將看到如何快速設定梯度累積,並使用 Accelerate 中提供的實用工具來執行它,這可能只需要新增一行新程式碼!

此示例將使用一個非常簡單的 PyTorch 訓練迴圈,每兩個批次執行一次梯度累積

device = "cuda"
model.to(device)

gradient_accumulation_steps = 2

for index, batch in enumerate(training_dataloader):
    inputs, targets = batch
    inputs = inputs.to(device)
    targets = targets.to(device)
    outputs = model(inputs)
    loss = loss_function(outputs, targets)
    loss = loss / gradient_accumulation_steps
    loss.backward()
    if (index + 1) % gradient_accumulation_steps == 0:
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

將其轉換為 Accelerate

首先,將前面展示的程式碼轉換為使用 Accelerate,但不使用特殊的梯度累積幫助函式

+ from accelerate import Accelerator
+ accelerator = Accelerator()

+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+     model, optimizer, training_dataloader, scheduler
+ )

  for index, batch in enumerate(training_dataloader):
      inputs, targets = batch
-     inputs = inputs.to(device)
-     targets = targets.to(device)
      outputs = model(inputs)
      loss = loss_function(outputs, targets)
      loss = loss / gradient_accumulation_steps
+     accelerator.backward(loss)
      if (index+1) % gradient_accumulation_steps == 0:
          optimizer.step()
          scheduler.step()
          optimizer.zero_grad()

在當前狀態下,由於一個稱為梯度同步的過程,這段程式碼不會高效地執行梯度累積。請在概念教程中閱讀更多相關資訊!

讓 Accelerate 處理梯度累積

現在剩下的就是讓 Accelerate 為我們處理梯度累積。為此,你應該向 Accelerator 傳遞一個 `gradient_accumulation_steps` 引數,指定每次呼叫 `step()` 之前要執行的步數,以及如何在呼叫 backward() 期間自動調整損失

  from accelerate import Accelerator
- accelerator = Accelerator()
+ accelerator = Accelerator(gradient_accumulation_steps=2)

或者,你可以向 Accelerator 物件的 `__init__` 傳遞一個 `gradient_accumulation_plugin` 引數,這將允許你進一步自定義梯度累積行為。有關更多資訊,請參閱 GradientAccumulationPlugin 文件。

從這裡,你可以使用訓練迴圈內的 accumulate() 上下文管理器來自動為你執行梯度累積!你只需將其包裹在我們程式碼的整個訓練部分之外

- for index, batch in enumerate(training_dataloader):
+ for batch in training_dataloader:
+     with accelerator.accumulate(model):
          inputs, targets = batch
          outputs = model(inputs)

你可以刪除所有對步數和損失調整的特殊檢查

- loss = loss / gradient_accumulation_steps
  accelerator.backward(loss)
- if (index+1) % gradient_accumulation_steps == 0:
  optimizer.step()
  scheduler.step()
  optimizer.zero_grad()

如你所見,Accelerator 能夠跟蹤你所在的批次數,並會自動知道是否透過準備好的最佳化器執行 `step` 以及如何調整損失。

通常情況下,使用梯度累積時,你需要調整步數以反映你正在訓練的總批次的變化。Accelerate 預設會自動為你完成此操作。在幕後,我們例項化了一個配置為執行此操作的 `GradientAccumulationPlugin`。

state.GradientState 與正在迭代的活動資料載入器同步。因此,它天真地假設當我們到達資料載入器的末尾時,一切都會同步並執行一個 `step`。要停用此功能,請在 `GradientAccumulationPlugin` 中將 `sync_with_dataloader` 設定為 `False`

from accelerate import Accelerator
from accelerate.utils import GradientAccumulationPlugin

plugin = GradientAccumulationPlugin(sync_with_dataloader=False)
accelerator = Accelerator(..., gradient_accumulation_plugin=plugin)

完成的程式碼

下面是使用 Accelerate 執行梯度累積的完整實現

from accelerate import Accelerator
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
    model, optimizer, training_dataloader, scheduler
)
for batch in training_dataloader:
    with accelerator.accumulate(model):
        inputs, targets = batch
        outputs = model(inputs)
        loss = loss_function(outputs, targets)
        accelerator.backward(loss)
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

重要的是,在上下文管理器 `with accelerator.accumulate(model)` 中**只能進行一次前向/後向傳播**。

要了解更多關於這個包裝器背後的魔法,請閱讀梯度同步概念指南

自包含示例

這是一個自包含的示例,你可以執行它來檢視 Accelerate 中梯度累積的實際效果

import torch
import copy
from accelerate import Accelerator
from accelerate.utils import set_seed
from torch.utils.data import TensorDataset, DataLoader

# seed
set_seed(0)

# define toy inputs and labels
x = torch.tensor([1., 2., 3., 4., 5., 6., 7., 8.])
y = torch.tensor([2., 4., 6., 8., 10., 12., 14., 16.])
gradient_accumulation_steps = 4
per_device_batch_size = len(x) // gradient_accumulation_steps

# define dataset and dataloader
dataset = TensorDataset(x, y)
dataloader = DataLoader(dataset, batch_size=per_device_batch_size)

# define model, optimizer and loss function
class SimpleLinearModel(torch.nn.Module):
    def __init__(self):
        super(SimpleLinearModel, self).__init__()
        self.weight = torch.nn.Parameter(torch.zeros((1, 1)))

    def forward(self, inputs):
        return inputs @ self.weight

model = SimpleLinearModel()
model_clone = copy.deepcopy(model)
criterion = torch.nn.MSELoss()
model_optimizer = torch.optim.SGD(model.parameters(), lr=0.02)
accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)
model, model_optimizer, dataloader = accelerator.prepare(model, model_optimizer, dataloader)
model_clone_optimizer = torch.optim.SGD(model_clone.parameters(), lr=0.02)
print(f"initial model weight is {model.weight.mean().item():.5f}")
print(f"initial model weight is {model_clone.weight.mean().item():.5f}")
for i, (inputs, labels) in enumerate(dataloader):
    with accelerator.accumulate(model):
        inputs = inputs.view(-1, 1)
        print(i, inputs.flatten())
        labels = labels.view(-1, 1)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        accelerator.backward(loss)
        model_optimizer.step()
        model_optimizer.zero_grad()
loss = criterion(x.view(-1, 1) @ model_clone.weight, y.view(-1, 1))
model_clone_optimizer.zero_grad()
loss.backward()
model_clone_optimizer.step()
print(f"w/ accumulation, the final model weight is {model.weight.mean().item():.5f}")
print(f"w/o accumulation, the final model weight is {model_clone.weight.mean().item():.5f}")
initial model weight is 0.00000
initial model weight is 0.00000
0 tensor([1., 2.])
1 tensor([3., 4.])
2 tensor([5., 6.])
3 tensor([7., 8.])
w/ accumulation, the final model weight is 2.04000
w/o accumulation, the final model weight is 2.04000

對可變大小訓練樣本的梯度累積

正如這篇部落格文章所指出的,在對可變大小的訓練樣本進行梯度累積時,會出現一個常見錯誤

[…] 對於像因果語言模型訓練這樣的詞元級任務,正確的損失應該透過**一個梯度累積步驟中所有批次的總損失**除以**這些批次中所有非填充詞元的總數**來計算。這與每批損失值的平均值不同。

換句話說,對於基於詞元級別的損失,必須進行一些調整。

骨架程式碼

from accelerate import Accelerator
import math
import contextlib

gradient_accumulation_steps = 2
accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
    model, optimizer, training_dataloader, scheduler
)

training_iterator = iter(training_dataloader)
num_samples_in_epoch = len(training_dataloader)
remainder = num_samples_in_epoch % gradient_accumulation_steps
remainder = remainder if remainder != 0 else gradient_accumulation_steps
total_updates = math.ceil(num_samples_in_epoch / gradient_accumulation_steps)
        

total_batched_samples = 0
for update_step in range(total_updates):
        # In order to correctly the total number of non-padded tokens on which we'll compute the cross-entropy loss
        # we need to pre-load the full local batch - i.e the next per_device_batch_size * accumulation_steps samples
        batch_samples = []
        num_batches_in_step = gradient_accumulation_steps if update_step != (total_updates - 1) else remainder
        for _ in range(num_batches_in_step):
            batch_samples += [next(training_iterator)]
            
        # get local num items in batch 
        num_items_in_batch = sum([(batch["labels"].ne(-100)).sum() for batch in batch_samples])
        # to compute it correctly in a multi-device DDP training, we need to gather the total number of items in the full batch.
        num_items_in_batch = accelerator.gather(num_items_in_batch).sum().item()
            
        for i, batch in enumerate(batch_samples):
            # if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
            # cf: https://muellerzr.github.io/blog/gradient_accumulation.html
            if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
                ctx = model.no_sync
            else:
                ctx = contextlib.nullcontext
            
            total_batched_samples += 1

            with ctx():
                inputs, targets = batch
                outputs = model(inputs)
                loss = loss_function(outputs, targets) # the loss function should sum over samples rather than averaging
                
                # We multiply by num_processes because the DDP calculates the average gradient across all devices whereas dividing by num_items_in_batch already takes into account all devices
                # Same reason for gradient_accumulation_steps, but this times it's Accelerate that calculate the average gradient across the accumulated steps
                loss = (loss * gradient_accumulation_steps * accelerator.num_processes) / num_items_in_batch
                
                accelerator.backward(loss)

        # Sync gradients and perform optimization steps once every gradient_accumulation_steps
        optimizer.step()
        scheduler.step()
        optimizer.zero_grad()

自包含的因果語言模型示例

import torch
import copy
from accelerate import Accelerator
from accelerate.utils import set_seed
from accelerate.logging import  get_logger
from torch.utils.data import Dataset, DataLoader
import math
import contexlib

# seed
set_seed(0)
logger = get_logger(__name__)

class MyDataset(Dataset):
    def __init__(self, num_samples):
        super().__init__()
        self.len = num_samples

    def __getitem__(self, index):
        input_ids = torch.arange(1, index+2, dtype=torch.float32)
        labels = torch.remainder(input_ids, 2)
        return {"input_ids": input_ids, "labels": labels}

    def __len__(self):
        return self.len
    
def collate_fn(features):
    input_ids = torch.nn.utils.rnn.pad_sequence([f["input_ids"] for f in features], batch_first=True, padding_value=-100)
    labels = torch.nn.utils.rnn.pad_sequence([f["labels"] for f in features], batch_first=True, padding_value=-100)
    return {"input_ids": input_ids[..., None], "labels": labels[..., None]}

# define toy inputs and labels
gradient_accumulation_steps = 2
per_device_batch_size = 4

# define accelerator
accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)

# define dataset and dataloader
# for this toy example, we'll compute gradient descent over one single global batch
dataset = MyDataset(per_device_batch_size*gradient_accumulation_steps*accelerator.num_processes)
dataloader = DataLoader(dataset, batch_size=per_device_batch_size, collate_fn=collate_fn)

# define model, model_optimizer and loss function
model = torch.nn.Linear(1, 2, bias=False)
model_clone = copy.deepcopy(model)
criterion = torch.nn.CrossEntropyLoss(reduction="sum") # must sum over samples rather than averaging
model_optimizer = torch.optim.SGD(model.parameters(), lr=0.08)


logger.warning(f"initial model weight is {model.weight.detach().cpu().squeeze()}")
logger.warning(f"initial model clone weight is {model_clone.weight.detach().cpu().squeeze()}")

# prepare artifacts - accelerator handles device placement and dataloader splitting
model, model_optimizer = accelerator.prepare(model, model_optimizer)
dataloader = accelerator.prepare_data_loader(dataloader, device_placement=True)
training_iterator = iter(dataloader)

num_samples_in_epoch = len(dataloader)
remainder = num_samples_in_epoch % gradient_accumulation_steps
remainder = remainder if remainder != 0 else gradient_accumulation_steps
total_gradient_updates = math.ceil(num_samples_in_epoch / gradient_accumulation_steps)

total_batched_samples = 0
for update_step in range(total_gradient_updates):
        # In order to correctly the total number of non-padded tokens on which we'll compute the cross-entropy loss
        # we need to pre-load the full local batch - i.e the next per_device_batch_size * accumulation_steps samples
        batch_samples = []
        num_batches_in_step = gradient_accumulation_steps if update_step != (total_gradient_updates - 1) else remainder
        for _ in range(num_batches_in_step):
            batch_samples += [next(training_iterator)]
            
        # get local num items in batch 
        local_num_items_in_batch = sum([(batch["labels"].ne(-100)).sum() for batch in batch_samples])
        logger.warning(f"Step {update_step} - Device {accelerator.process_index} - num items in the local batch {local_num_items_in_batch}", main_process_only=False)

        # to compute it correctly in a multi-device DDP training, we need to gather the total number of items in the full batch.
        num_items_in_batch = accelerator.gather(local_num_items_in_batch).sum().item()
        logger.warning(f"Total num items {num_items_in_batch}")

        for i, batch in enumerate(batch_samples):
            inputs, labels = batch["input_ids"], batch["labels"]
            total_batched_samples += 1
            # if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
            # cf: https://muellerzr.github.io/blog/gradient_accumulation.html
            if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
                ctx = model.no_sync
            else:
                ctx = contextlib.nullcontext
            with ctx():

                outputs = model(inputs)
                loss = criterion(outputs.view(-1, 2), labels.view(-1).to(torch.int64))
                
                # We multiply by num_processes because the DDP calculates the average gradient across all devices whereas dividing by num_items_in_batch already takes into account all devices
                # Same reason for gradient_accumulation_steps, but this times it's Accelerate that calculate the average gradient across the accumulated steps 
                loss = (loss * gradient_accumulation_steps * accelerator.num_processes) / num_items_in_batch
                accelerator.backward(loss)
        model_optimizer.step()
        model_optimizer.zero_grad()
                

logger.warning(f"Device {accelerator.process_index} - w/ accumulation, the final model weight is {accelerator.unwrap_model(model).weight.detach().cpu().squeeze()}", main_process_only=False)

# We know do the same operation but on a single device and without gradient accumulation

if accelerator.is_main_process:
    # prepare one single entire batch
    dataloader = DataLoader(dataset, batch_size=len(dataset), collate_fn=collate_fn)
    full_batch_without_accum = next(iter(dataloader))
    total_inputs, total_labels = full_batch_without_accum["input_ids"], full_batch_without_accum["labels"]
    model_clone_optimizer = torch.optim.SGD(model_clone.parameters(), lr=0.08)
    
    # train the cloned model
    loss = torch.nn.CrossEntropyLoss(reduction="mean")(model_clone(total_inputs).view(-1, 2), total_labels.view(-1).to(torch.int64))
    model_clone_optimizer.zero_grad()
    loss.backward()
    model_clone_optimizer.step()
    
    # We should have the same final weights.
    logger.warning(f"w/o accumulation, the final model weight is {model_clone.weight.detach().cpu().squeeze()}")

在單個裝置上的結果 - 梯度累積步數設定為 1,批次大小設定為 8

initial model weight is tensor([-0.0075,  0.5364])
initial model clone weight is tensor([-0.0075,  0.5364])
Step 0 - Device 0 - num items in the local batch 36
Total num items 36
Device 0 - w/ accumulation, the final model weight is tensor([0.0953, 0.4337])
w/o accumulation, the final model weight is tensor([0.0953, 0.4337])

在雙裝置設定上的結果 - 梯度累積步數設定為 2,批次大小設定為 4。

initial model weight is tensor([-0.0075,  0.5364])
initial model clone weight is tensor([-0.0075,  0.5364])
Step 0 - Device 0 - num items in the local batch 52
Step 0 - Device 1 - num items in the local batch 84
Total num items 136
Device 1 - w/ accumulation, the final model weight is tensor([0.2117, 0.3172])
Device 0 - w/ accumulation, the final model weight is tensor([0.2117, 0.3172])
w/o accumulation, the final model weight is tensor([0.2117, 0.3172])

更進一步:

請在示例資料夾中的路徑 accelerate/examples/by_feature/gradient_accumulation_for_autoregressive_models.py 下找到一個真實世界訓練執行的完整示例指令碼。

在多個訓練配置上執行它,全域性批次大小恆定為 32,得到以下圖表

請注意,在訓練步驟 20 之前,訓練損失完全相同。在此訓練步驟之後出現的微小偏差發生在第一個 epoch 的末尾,因為預設情況下,當總批次大小不能整除資料集時,資料載入器會預設複製資料集開頭的樣本。

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.