Accelerate 文件
使用 Accelerate 執行梯度累積
並獲得增強的文件體驗
開始使用
使用 Accelerate 執行梯度累積
梯度累積是一種技術,它能讓你在機器記憶體無法容納的情況下,使用更大的批次大小進行訓練。這是透過在多個批次上累積梯度,並且僅在執行一定數量的批次後才更新最佳化器來實現的。
雖然從技術上講,標準的梯度累積程式碼在分散式設定中也能正常工作,但這並不是最高效的方法,你可能會遇到明顯的減速!
在本教程中,你將看到如何快速設定梯度累積,並使用 Accelerate 中提供的實用工具來執行它,這可能只需要新增一行新程式碼!
此示例將使用一個非常簡單的 PyTorch 訓練迴圈,每兩個批次執行一次梯度累積
device = "cuda"
model.to(device)
gradient_accumulation_steps = 2
for index, batch in enumerate(training_dataloader):
inputs, targets = batch
inputs = inputs.to(device)
targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss = loss / gradient_accumulation_steps
loss.backward()
if (index + 1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
將其轉換為 Accelerate
首先,將前面展示的程式碼轉換為使用 Accelerate,但不使用特殊的梯度累積幫助函式
+ from accelerate import Accelerator
+ accelerator = Accelerator()
+ model, optimizer, training_dataloader, scheduler = accelerator.prepare(
+ model, optimizer, training_dataloader, scheduler
+ )
for index, batch in enumerate(training_dataloader):
inputs, targets = batch
- inputs = inputs.to(device)
- targets = targets.to(device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
loss = loss / gradient_accumulation_steps
+ accelerator.backward(loss)
if (index+1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
在當前狀態下,由於一個稱為梯度同步的過程,這段程式碼不會高效地執行梯度累積。請在概念教程中閱讀更多相關資訊!
讓 Accelerate 處理梯度累積
現在剩下的就是讓 Accelerate 為我們處理梯度累積。為此,你應該向 Accelerator 傳遞一個 `gradient_accumulation_steps` 引數,指定每次呼叫 `step()` 之前要執行的步數,以及如何在呼叫 backward() 期間自動調整損失
from accelerate import Accelerator
- accelerator = Accelerator()
+ accelerator = Accelerator(gradient_accumulation_steps=2)
或者,你可以向 Accelerator 物件的 `__init__` 傳遞一個 `gradient_accumulation_plugin` 引數,這將允許你進一步自定義梯度累積行為。有關更多資訊,請參閱 GradientAccumulationPlugin 文件。
從這裡,你可以使用訓練迴圈內的 accumulate() 上下文管理器來自動為你執行梯度累積!你只需將其包裹在我們程式碼的整個訓練部分之外
- for index, batch in enumerate(training_dataloader):
+ for batch in training_dataloader:
+ with accelerator.accumulate(model):
inputs, targets = batch
outputs = model(inputs)
你可以刪除所有對步數和損失調整的特殊檢查
- loss = loss / gradient_accumulation_steps
accelerator.backward(loss)
- if (index+1) % gradient_accumulation_steps == 0:
optimizer.step()
scheduler.step()
optimizer.zero_grad()
如你所見,Accelerator 能夠跟蹤你所在的批次數,並會自動知道是否透過準備好的最佳化器執行 `step` 以及如何調整損失。
通常情況下,使用梯度累積時,你需要調整步數以反映你正在訓練的總批次的變化。Accelerate 預設會自動為你完成此操作。在幕後,我們例項化了一個配置為執行此操作的 `GradientAccumulationPlugin`。
state.GradientState 與正在迭代的活動資料載入器同步。因此,它天真地假設當我們到達資料載入器的末尾時,一切都會同步並執行一個 `step`。要停用此功能,請在 `GradientAccumulationPlugin` 中將 `sync_with_dataloader` 設定為 `False`
from accelerate import Accelerator
from accelerate.utils import GradientAccumulationPlugin
plugin = GradientAccumulationPlugin(sync_with_dataloader=False)
accelerator = Accelerator(..., gradient_accumulation_plugin=plugin)
完成的程式碼
下面是使用 Accelerate 執行梯度累積的完整實現
from accelerate import Accelerator
accelerator = Accelerator(gradient_accumulation_steps=2)
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
model, optimizer, training_dataloader, scheduler
)
for batch in training_dataloader:
with accelerator.accumulate(model):
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
重要的是,在上下文管理器 `with accelerator.accumulate(model)` 中**只能進行一次前向/後向傳播**。
要了解更多關於這個包裝器背後的魔法,請閱讀梯度同步概念指南
自包含示例
這是一個自包含的示例,你可以執行它來檢視 Accelerate 中梯度累積的實際效果
import torch
import copy
from accelerate import Accelerator
from accelerate.utils import set_seed
from torch.utils.data import TensorDataset, DataLoader
# seed
set_seed(0)
# define toy inputs and labels
x = torch.tensor([1., 2., 3., 4., 5., 6., 7., 8.])
y = torch.tensor([2., 4., 6., 8., 10., 12., 14., 16.])
gradient_accumulation_steps = 4
per_device_batch_size = len(x) // gradient_accumulation_steps
# define dataset and dataloader
dataset = TensorDataset(x, y)
dataloader = DataLoader(dataset, batch_size=per_device_batch_size)
# define model, optimizer and loss function
class SimpleLinearModel(torch.nn.Module):
def __init__(self):
super(SimpleLinearModel, self).__init__()
self.weight = torch.nn.Parameter(torch.zeros((1, 1)))
def forward(self, inputs):
return inputs @ self.weight
model = SimpleLinearModel()
model_clone = copy.deepcopy(model)
criterion = torch.nn.MSELoss()
model_optimizer = torch.optim.SGD(model.parameters(), lr=0.02)
accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)
model, model_optimizer, dataloader = accelerator.prepare(model, model_optimizer, dataloader)
model_clone_optimizer = torch.optim.SGD(model_clone.parameters(), lr=0.02)
print(f"initial model weight is {model.weight.mean().item():.5f}")
print(f"initial model weight is {model_clone.weight.mean().item():.5f}")
for i, (inputs, labels) in enumerate(dataloader):
with accelerator.accumulate(model):
inputs = inputs.view(-1, 1)
print(i, inputs.flatten())
labels = labels.view(-1, 1)
outputs = model(inputs)
loss = criterion(outputs, labels)
accelerator.backward(loss)
model_optimizer.step()
model_optimizer.zero_grad()
loss = criterion(x.view(-1, 1) @ model_clone.weight, y.view(-1, 1))
model_clone_optimizer.zero_grad()
loss.backward()
model_clone_optimizer.step()
print(f"w/ accumulation, the final model weight is {model.weight.mean().item():.5f}")
print(f"w/o accumulation, the final model weight is {model_clone.weight.mean().item():.5f}")
initial model weight is 0.00000
initial model weight is 0.00000
0 tensor([1., 2.])
1 tensor([3., 4.])
2 tensor([5., 6.])
3 tensor([7., 8.])
w/ accumulation, the final model weight is 2.04000
w/o accumulation, the final model weight is 2.04000
對可變大小訓練樣本的梯度累積
正如這篇部落格文章所指出的,在對可變大小的訓練樣本進行梯度累積時,會出現一個常見錯誤
[…] 對於像因果語言模型訓練這樣的詞元級任務,正確的損失應該透過**一個梯度累積步驟中所有批次的總損失**除以**這些批次中所有非填充詞元的總數**來計算。這與每批損失值的平均值不同。
換句話說,對於基於詞元級別的損失,必須進行一些調整。
骨架程式碼
from accelerate import Accelerator
import math
import contextlib
gradient_accumulation_steps = 2
accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)
model, optimizer, training_dataloader, scheduler = accelerator.prepare(
model, optimizer, training_dataloader, scheduler
)
training_iterator = iter(training_dataloader)
num_samples_in_epoch = len(training_dataloader)
remainder = num_samples_in_epoch % gradient_accumulation_steps
remainder = remainder if remainder != 0 else gradient_accumulation_steps
total_updates = math.ceil(num_samples_in_epoch / gradient_accumulation_steps)
total_batched_samples = 0
for update_step in range(total_updates):
# In order to correctly the total number of non-padded tokens on which we'll compute the cross-entropy loss
# we need to pre-load the full local batch - i.e the next per_device_batch_size * accumulation_steps samples
batch_samples = []
num_batches_in_step = gradient_accumulation_steps if update_step != (total_updates - 1) else remainder
for _ in range(num_batches_in_step):
batch_samples += [next(training_iterator)]
# get local num items in batch
num_items_in_batch = sum([(batch["labels"].ne(-100)).sum() for batch in batch_samples])
# to compute it correctly in a multi-device DDP training, we need to gather the total number of items in the full batch.
num_items_in_batch = accelerator.gather(num_items_in_batch).sum().item()
for i, batch in enumerate(batch_samples):
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
# cf: https://muellerzr.github.io/blog/gradient_accumulation.html
if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
ctx = model.no_sync
else:
ctx = contextlib.nullcontext
total_batched_samples += 1
with ctx():
inputs, targets = batch
outputs = model(inputs)
loss = loss_function(outputs, targets) # the loss function should sum over samples rather than averaging
# We multiply by num_processes because the DDP calculates the average gradient across all devices whereas dividing by num_items_in_batch already takes into account all devices
# Same reason for gradient_accumulation_steps, but this times it's Accelerate that calculate the average gradient across the accumulated steps
loss = (loss * gradient_accumulation_steps * accelerator.num_processes) / num_items_in_batch
accelerator.backward(loss)
# Sync gradients and perform optimization steps once every gradient_accumulation_steps
optimizer.step()
scheduler.step()
optimizer.zero_grad()
自包含的因果語言模型示例
import torch
import copy
from accelerate import Accelerator
from accelerate.utils import set_seed
from accelerate.logging import get_logger
from torch.utils.data import Dataset, DataLoader
import math
import contexlib
# seed
set_seed(0)
logger = get_logger(__name__)
class MyDataset(Dataset):
def __init__(self, num_samples):
super().__init__()
self.len = num_samples
def __getitem__(self, index):
input_ids = torch.arange(1, index+2, dtype=torch.float32)
labels = torch.remainder(input_ids, 2)
return {"input_ids": input_ids, "labels": labels}
def __len__(self):
return self.len
def collate_fn(features):
input_ids = torch.nn.utils.rnn.pad_sequence([f["input_ids"] for f in features], batch_first=True, padding_value=-100)
labels = torch.nn.utils.rnn.pad_sequence([f["labels"] for f in features], batch_first=True, padding_value=-100)
return {"input_ids": input_ids[..., None], "labels": labels[..., None]}
# define toy inputs and labels
gradient_accumulation_steps = 2
per_device_batch_size = 4
# define accelerator
accelerator = Accelerator(gradient_accumulation_steps=gradient_accumulation_steps)
# define dataset and dataloader
# for this toy example, we'll compute gradient descent over one single global batch
dataset = MyDataset(per_device_batch_size*gradient_accumulation_steps*accelerator.num_processes)
dataloader = DataLoader(dataset, batch_size=per_device_batch_size, collate_fn=collate_fn)
# define model, model_optimizer and loss function
model = torch.nn.Linear(1, 2, bias=False)
model_clone = copy.deepcopy(model)
criterion = torch.nn.CrossEntropyLoss(reduction="sum") # must sum over samples rather than averaging
model_optimizer = torch.optim.SGD(model.parameters(), lr=0.08)
logger.warning(f"initial model weight is {model.weight.detach().cpu().squeeze()}")
logger.warning(f"initial model clone weight is {model_clone.weight.detach().cpu().squeeze()}")
# prepare artifacts - accelerator handles device placement and dataloader splitting
model, model_optimizer = accelerator.prepare(model, model_optimizer)
dataloader = accelerator.prepare_data_loader(dataloader, device_placement=True)
training_iterator = iter(dataloader)
num_samples_in_epoch = len(dataloader)
remainder = num_samples_in_epoch % gradient_accumulation_steps
remainder = remainder if remainder != 0 else gradient_accumulation_steps
total_gradient_updates = math.ceil(num_samples_in_epoch / gradient_accumulation_steps)
total_batched_samples = 0
for update_step in range(total_gradient_updates):
# In order to correctly the total number of non-padded tokens on which we'll compute the cross-entropy loss
# we need to pre-load the full local batch - i.e the next per_device_batch_size * accumulation_steps samples
batch_samples = []
num_batches_in_step = gradient_accumulation_steps if update_step != (total_gradient_updates - 1) else remainder
for _ in range(num_batches_in_step):
batch_samples += [next(training_iterator)]
# get local num items in batch
local_num_items_in_batch = sum([(batch["labels"].ne(-100)).sum() for batch in batch_samples])
logger.warning(f"Step {update_step} - Device {accelerator.process_index} - num items in the local batch {local_num_items_in_batch}", main_process_only=False)
# to compute it correctly in a multi-device DDP training, we need to gather the total number of items in the full batch.
num_items_in_batch = accelerator.gather(local_num_items_in_batch).sum().item()
logger.warning(f"Total num items {num_items_in_batch}")
for i, batch in enumerate(batch_samples):
inputs, labels = batch["input_ids"], batch["labels"]
total_batched_samples += 1
# if we perform gradient accumulation in a multi-devices set-up, we want to avoid unnecessary communications when accumulating
# cf: https://muellerzr.github.io/blog/gradient_accumulation.html
if (i < len(batch_samples) - 1 and accelerator.num_processes > 1):
ctx = model.no_sync
else:
ctx = contextlib.nullcontext
with ctx():
outputs = model(inputs)
loss = criterion(outputs.view(-1, 2), labels.view(-1).to(torch.int64))
# We multiply by num_processes because the DDP calculates the average gradient across all devices whereas dividing by num_items_in_batch already takes into account all devices
# Same reason for gradient_accumulation_steps, but this times it's Accelerate that calculate the average gradient across the accumulated steps
loss = (loss * gradient_accumulation_steps * accelerator.num_processes) / num_items_in_batch
accelerator.backward(loss)
model_optimizer.step()
model_optimizer.zero_grad()
logger.warning(f"Device {accelerator.process_index} - w/ accumulation, the final model weight is {accelerator.unwrap_model(model).weight.detach().cpu().squeeze()}", main_process_only=False)
# We know do the same operation but on a single device and without gradient accumulation
if accelerator.is_main_process:
# prepare one single entire batch
dataloader = DataLoader(dataset, batch_size=len(dataset), collate_fn=collate_fn)
full_batch_without_accum = next(iter(dataloader))
total_inputs, total_labels = full_batch_without_accum["input_ids"], full_batch_without_accum["labels"]
model_clone_optimizer = torch.optim.SGD(model_clone.parameters(), lr=0.08)
# train the cloned model
loss = torch.nn.CrossEntropyLoss(reduction="mean")(model_clone(total_inputs).view(-1, 2), total_labels.view(-1).to(torch.int64))
model_clone_optimizer.zero_grad()
loss.backward()
model_clone_optimizer.step()
# We should have the same final weights.
logger.warning(f"w/o accumulation, the final model weight is {model_clone.weight.detach().cpu().squeeze()}")
在單個裝置上的結果 - 梯度累積步數設定為 1,批次大小設定為 8
initial model weight is tensor([-0.0075, 0.5364])
initial model clone weight is tensor([-0.0075, 0.5364])
Step 0 - Device 0 - num items in the local batch 36
Total num items 36
Device 0 - w/ accumulation, the final model weight is tensor([0.0953, 0.4337])
w/o accumulation, the final model weight is tensor([0.0953, 0.4337])
在雙裝置設定上的結果 - 梯度累積步數設定為 2,批次大小設定為 4。
initial model weight is tensor([-0.0075, 0.5364])
initial model clone weight is tensor([-0.0075, 0.5364])
Step 0 - Device 0 - num items in the local batch 52
Step 0 - Device 1 - num items in the local batch 84
Total num items 136
Device 1 - w/ accumulation, the final model weight is tensor([0.2117, 0.3172])
Device 0 - w/ accumulation, the final model weight is tensor([0.2117, 0.3172])
w/o accumulation, the final model weight is tensor([0.2117, 0.3172])
更進一步:
請在示例資料夾中的路徑 accelerate/examples/by_feature/gradient_accumulation_for_autoregressive_models.py
下找到一個真實世界訓練執行的完整示例指令碼。
在多個訓練配置上執行它,全域性批次大小恆定為 32,得到以下圖表

請注意,在訓練步驟 20 之前,訓練損失完全相同。在此訓練步驟之後出現的微小偏差發生在第一個 epoch 的末尾,因為預設情況下,當總批次大小不能整除資料集時,資料載入器會預設複製資料集開頭的樣本。
< > 在 GitHub 上更新