Accelerate 文件

加速器

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

Accelerator

Accelerator 是用於在任何型別的訓練設定中啟用分散式訓練的主類。請閱讀將 Accelerator 新增到您的程式碼中教程,以瞭解有關如何將 Accelerator 新增到指令碼的更多資訊。

Accelerator

class accelerate.Accelerator

< >

( device_placement: bool = True split_batches: bool = <object object at 0x7f9bc0a7a0f0> mixed_precision: PrecisionType | str | None = None gradient_accumulation_steps: int = 1 cpu: bool = False dataloader_config: DataLoaderConfiguration | None = None deepspeed_plugin: DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None = None fsdp_plugin: FullyShardedDataParallelPlugin | None = None torch_tp_plugin: TorchTensorParallelPlugin | None = None megatron_lm_plugin: MegatronLMPlugin | None = None rng_types: list[str | RNGType] | None = None log_with: str | LoggerType | GeneralTracker | list[str | LoggerType | GeneralTracker] | None = None project_dir: str | os.PathLike | None = None project_config: ProjectConfiguration | None = None gradient_accumulation_plugin: GradientAccumulationPlugin | None = None step_scheduler_with_optimizer: bool = True kwargs_handlers: list[KwargsHandler] | None = None dynamo_backend: DynamoBackend | str | None = None dynamo_plugin: TorchDynamoPlugin | None = None deepspeed_plugins: DeepSpeedPlugin | dict[str, DeepSpeedPlugin] | None = None parallelism_config: ParallelismConfig | None = None )

引數

  • device_placement (bool,可選,預設為 True) — Accelerator 是否應將物件(資料載入器生成的張量、模型等)放置在裝置上。
  • mixed_precision (str,可選) — 是否使用混合精度訓練。可選擇 'no'、'fp16'、'bf16' 或 'fp8'。將預設為環境變數 ACCELERATE_MIXED_PRECISION 中的值,該值將使用當前系統的 Accelerate 配置中的預設值或透過 accelerate.launch 命令傳遞的標誌。'fp8' 需要安裝 transformers-engine。
  • gradient_accumulation_steps (int,可選,預設為 1) — 在累積梯度之前應經過的步驟數。大於 1 的數字應與 Accelerator.accumulate 結合使用。如果未傳遞,將預設為環境變數 ACCELERATE_GRADIENT_ACCUMULATION_STEPS 中的值。也可以透過 GradientAccumulationPlugin 進行配置。
  • cpu (bool,可選) — 是否強制指令碼在 CPU 上執行。如果設定為 True,將忽略可用的 GPU,並強制在單個程序上執行。
  • dataloader_config (DataLoaderConfiguration,可選) — 用於配置在分散式場景中如何處理資料載入器的配置。
  • deepspeed_plugin (DeepSpeedPluginstr 的字典 — DeepSpeedPlugin,可選):使用此引數調整與 DeepSpeed 相關的引數。此引數是可選的,可以直接使用 *accelerate config* 進行配置。如果使用多個外掛,請使用每個外掛配置的 key 屬性從 accelerator.state.get_deepspeed_plugin(key) 訪問它們。是 deepspeed_plugins 的別名。
  • fsdp_plugin (FullyShardedDataParallelPlugin,可選) — 使用此引數調整與 FSDP 相關的引數。此引數是可選的,可以直接使用 *accelerate config* 進行配置。
  • torch_tp_plugin (TorchTensorParallelPlugin,可選) — 已棄用:請改用帶有 tp_sizeparallelism_config
  • megatron_lm_plugin (MegatronLMPlugin,可選) — 使用此引數調整與 MegatronLM 相關的引數。此引數是可選的,可以直接使用 *accelerate config* 進行配置。
  • rng_types (strRNGType 列表) — 在您準備的資料載入器的每次迭代開始時要同步的隨機數生成器列表。應為以下一項或多項:

    • "torch":基礎 torch 隨機數生成器
    • "cuda":CUDA 隨機數生成器(僅限 GPU)
    • "xla":XLA 隨機數生成器(僅限 TPU)
    • "generator":取樣器的 torch.Generator(如果資料載入器中沒有采樣器,則為批取樣器),或者如果底層資料集是該型別,則是可迭代資料集的 torch.Generator(如果存在)。

    對於 PyTorch 版本 <=1.5.1,將預設為 ["torch"],對於 PyTorch 版本 >= 1.6,將預設為 ["generator"]

  • log_with (strLoggerTypeGeneralTracker 列表,可選) — 為實驗跟蹤設定的日誌記錄器列表。應為以下一項或多項:

    • "all"
    • "tensorboard"
    • "wandb"
    • "trackio"
    • "aim"
    • "comet_ml"
    • "mlflow"
    • "dvclive"
    • "swanlab" 如果選擇 "all",將選擇環境中所有可用的跟蹤器並初始化它們。也可以接受 GeneralTracker 的實現以用於自定義跟蹤器,並且可以與 "all" 結合使用。
  • project_config (ProjectConfiguration,可選) — 用於配置如何處理狀態儲存的配置。
  • project_dir (stros.PathLike,可選) — 用於儲存資料(如本地相容日誌記錄器的日誌和可能儲存的檢查點)的目錄路徑。
  • step_scheduler_with_optimizer (bool,可選,預設為 True) — 如果學習率排程器與最佳化器同時更新,則設定為 True;如果僅在特定情況下(例如每個 epoch 結束時)更新,則設定為 False
  • kwargs_handlers (KwargsHandler 列表,可選) — KwargsHandler 列表,用於自定義如何建立與分散式訓練、效能分析或混合精度相關的物件。有關更多資訊,請參閱 kwargs
  • dynamo_backend (strDynamoBackend,可選,預設為 "no") — 設定為可能的 dynamo 後端之一,以使用 torch dynamo 最佳化您的訓練。
  • dynamo_plugin (TorchDynamoPlugin,可選) — 用於配置如何處理 torch dynamo 的配置,如果需要的調整不僅僅是 backendmode
  • gradient_accumulation_plugin (GradientAccumulationPlugin,可選) — 用於配置如何處理梯度累積的配置,如果需要的調整不僅僅是 gradient_accumulation_steps

為分散式訓練或混合精度訓練建立 accelerator 的例項。

可用屬性

  • device (torch.device) — 要使用的裝置。
  • distributed_type (DistributedType) — 分散式訓練配置。
  • local_process_index (int) — 當前機器上的程序索引。
  • mixed_precision (str) — 配置的混合精度模式。
  • num_processes (int) — 用於訓練的總程序數。
  • optimizer_step_was_skipped (bool) — 最佳化器更新是否被跳過(因為混合精度中的梯度溢位),在這種情況下學習率不應改變。
  • process_index (int) — 當前程序在所有程序中的總索引。
  • state (AcceleratorState) — 分散式設定狀態。
  • sync_gradients (bool) — 梯度當前是否在所有程序中同步。
  • use_distributed (bool) — 當前配置是否用於分散式訓練。

accumulate

< >

( *models )

引數

  • *models (torch.nn.Module 列表) — 使用 Accelerator.prepare 準備的 PyTorch 模組。傳遞給 accumulate() 的模型將在分散式訓練的反向傳播過程中跳過梯度同步。

一個上下文管理器,它將輕量級地包裝並自動執行梯度累積。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=1)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)

>>> for input, output in dataloader:
...     with accelerator.accumulate(model):
...         outputs = model(input)
...         loss = loss_func(outputs)
...         loss.backward()
...         optimizer.step()
...         scheduler.step()
...         optimizer.zero_grad()

autocast

< >

( autocast_handler: AutocastKwargs = None )

如果啟用了自動混合精度,將在此上下文管理器內的塊中應用它。否則不會發生任何變化。

可以傳入一個不同的 autocast_handler 來覆蓋在 Accelerator 物件中設定的那個。這在 autocast 下的塊中非常有用,當你想要恢復到 fp32 時。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(mixed_precision="fp16")
>>> with accelerator.autocast():
...     train()

backward

< >

( loss **kwargs )

根據 GradientAccumulationPlugin 縮放梯度,並根據配置呼叫正確的 backward()

應該用來替代 loss.backward()

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> outputs = model(inputs)
>>> loss = loss_fn(outputs, labels)
>>> accelerator.backward(loss)

check_trigger

< >

( )

檢查內部觸發張量在任何程序中是否被設定為 1。如果是,將返回 True 並將觸發張量重置為 0。

注意:不需要 wait_for_everyone()

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
...     accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_trigger():
...     break

clear

< >

( *objects )

Accelerate.free_memory 的別名,釋放對儲存的內部物件的所有引用並呼叫垃圾回收器。您應該在兩次使用不同模型/最佳化器的訓練之間呼叫此方法。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, scheduler = ...
>>> model, optimizer, scheduler = accelerator.prepare(model, optimizer, scheduler)
>>> model, optimizer, scheduler = accelerator.clear(model, optimizer, scheduler)

clip_grad_norm_

< >

( parameters max_norm norm_type = 2 ) torch.Tensor

返回

torch.Tensor

引數梯度的總範數(視為單個向量)。

應該用來代替 torch.nn.utils.clip_grad_norm_

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)

>>> for input, target in dataloader:
...     optimizer.zero_grad()
...     output = model(input)
...     loss = loss_func(output, target)
...     accelerator.backward(loss)
...     if accelerator.sync_gradients:
...         accelerator.clip_grad_norm_(model.parameters(), max_grad_norm)
...     optimizer.step()

clip_grad_value_

< >

( parameters clip_value )

應該用來代替 torch.nn.utils.clip_grad_value_

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(gradient_accumulation_steps=2)
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)

>>> for input, target in dataloader:
...     optimizer.zero_grad()
...     output = model(input)
...     loss = loss_func(output, target)
...     accelerator.backward(loss)
...     if accelerator.sync_gradients:
...         accelerator.clip_grad_value_(model.parameters(), clip_value)
...     optimizer.step()

end_training

< >

( )

執行任何特殊的訓練結束行為,例如僅在主程序上停止跟蹤器或銷燬程序組。如果使用實驗跟蹤,應始終在指令碼結束時呼叫。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(log_with="tensorboard")
>>> accelerator.init_trackers("my_project")
>>> # Do training
>>> accelerator.end_training()

free_memory

< >

( *objects )

將釋放對儲存的內部物件的所有引用並呼叫垃圾回收器。您應該在兩次使用不同模型/最佳化器的訓練之間呼叫此方法。同時會將 Accelerator.step 重置為 0。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, scheduler = ...
>>> model, optimizer, scheduler = accelerator.prepare(model, optimizer, scheduler)
>>> model, optimizer, scheduler = accelerator.free_memory(model, optimizer, scheduler)

gather

< >

( tensor ) torch.Tensor,或巢狀的元組/列表/字典的 torch.Tensor

引數

  • tensor (torch.Tensor,或巢狀的元組/列表/字典的 torch.Tensor) — 要在所有程序中收集的張量。

返回

torch.Tensor,或巢狀的元組/列表/字典的 torch.Tensor

收集到的張量。注意,結果的第一維是 *num_processes* 乘以輸入張量的第一維。

在所有程序中收集 *tensor* 中的值,並在第一個維度上進行連線。在進行評估時,用於重新組合所有程序的預測非常有用。

注意:此收集操作在所有程序中進行。

示例

>>> # Assuming four processes
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> process_tensor = torch.tensor([accelerator.process_index])
>>> gathered_tensor = accelerator.gather(process_tensor)
>>> gathered_tensor
tensor([0, 1, 2, 3])

gather_for_metrics

< >

( input_data use_gather_object = False )

引數

  • input (torch.Tensorobject、巢狀的元組/列表/字典的 torch.Tensor,或巢狀的元組/列表/字典的 object) — 用於在所有程序中計算指標的張量或物件。
  • use_gather_object(bool) — 是否強制使用 gather_object 而不是 gather(如果傳遞的所有物件都不包含張量,則已經這樣做了)。此標誌對於收集不同大小的張量非常有用,因為我們不想沿著第一個維度進行填充和連線。與 GPU 張量一起使用時支援不佳且效率低下,因為它會因張量被 pickle 化而導致 GPU -> CPU 的傳輸。

收集 input_data,並在分散式系統上可能刪除最後一個批次中的重複項。應用於收集用於指標計算的輸入和目標。

示例

>>> # Assuming two processes, with a batch size of 5 on a dataset with 9 samples
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader = torch.utils.data.DataLoader(range(9), batch_size=5)
>>> dataloader = accelerator.prepare(dataloader)
>>> batch = next(iter(dataloader))
>>> gathered_items = accelerator.gather_for_metrics(batch)
>>> len(gathered_items)
9

get_state_dict

< >

( model unwrap = True ) dict

引數

  • model (torch.nn.Module) — 透過 Accelerator.prepare() 傳遞的 PyTorch 模型。
  • unwrap (bool,可選,預設為 True) — 是否返回 model 的原始底層 state_dict,還是返回包裝後的 state_dict。

返回

字典

可能不包含完整精度的模型的 state_dict。

返回透過 Accelerator.prepare() 傳遞的模型的 state_dict,可能不包含完整精度。

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> net = torch.nn.Linear(2, 2)
>>> net = accelerator.prepare(net)
>>> state_dict = accelerator.get_state_dict(net)

get_tracker

< >

( name: str unwrap: bool = False ) GeneralTracker

引數

  • name (str) — 跟蹤器的名稱,對應於 .name 屬性。
  • unwrap (bool) — 是返回內部跟蹤機制還是返回包裝後的跟蹤器(推薦)。

返回

GeneralTracker

如果存在,則返回與 name 對應的跟蹤器。

僅在主程序中,根據 nameself.trackers 返回一個 tracker

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(log_with="tensorboard")
>>> accelerator.init_trackers("my_project")
>>> tensorboard_tracker = accelerator.get_tracker("tensorboard")

join_uneven_inputs

< >

( joinables even_batches = None )

引數

  • joinables (list[torch.distributed.algorithms.Joinable]) — 一個模型或最佳化器的列表,它們是 torch.distributed.algorithms.Joinable 的子類。最常見的是一個使用 Accelerator.prepare 準備用於 DistributedDataParallel 訓練的 PyTorch Module。
  • even_batches (bool, 可選) — 如果設定,將覆蓋在 Accelerator 中設定的 even_batches 的值。如果未提供,將使用預設的 Accelerator 值。

一個上下文管理器,它有助於在不均勻輸入上進行分散式訓練或評估,作為 torch.distributed.algorithms.join 的包裝器。當總批次大小不能被資料集長度整除時,這很有用。

join_uneven_inputs 僅支援在多個 GPU 上進行分散式資料並行訓練。對於任何其他配置,此方法將沒有效果。

覆蓋 even_batches 不會影響可迭代式資料載入器。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator(even_batches=True)
>>> ddp_model, optimizer, dataloader = accelerator.prepare(model, optimizer, dataloader)

>>> with accelerator.join_uneven_inputs([ddp_model], even_batches=False):
...     for input, output in dataloader:
...         outputs = model(input)
...         loss = loss_func(outputs)
...         loss.backward()
...         optimizer.step()
...         optimizer.zero_grad()

load_state

< >

( input_dir: str = None load_kwargs: dict | None = None **load_model_func_kwargs )

引數

  • input_dir (stros.PathLike) — 所有相關權重和狀態儲存的資料夾名稱。如果使用了 automatic_checkpoint_naming,則可以為 None,並將從最新的檢查點載入。
  • load_kwargs (dict, 可選) — 傳遞給底層 load 函式的附加關鍵字引數,例如 state_dict 和 optimizer 的可選引數。
  • load_model_func_kwargs (dict, 可選) — 用於載入模型的附加關鍵字引數,可以傳遞給底層載入函式,例如 DeepSpeed 的 load_checkpoint 函式的可選引數或用於載入模型和最佳化器的 map_location

載入模型、最佳化器、縮放器、RNG 生成器和已註冊物件的當前狀態。

應僅與 Accelerator.save_state() 結合使用。如果某個檔案未註冊用於檢查點,則即使它儲存在目錄中也不會被載入。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, lr_scheduler = ...
>>> model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
>>> accelerator.load_state("my_checkpoint")

local_main_process_first

< >

( )

讓本地主程序先進入 with 塊。

其他程序將在主程序退出後進入 with 塊。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> with accelerator.local_main_process_first():
...     # This will be printed first by local process 0 then in a seemingly
...     # random order by the other processes.
...     print(f"This will be printed by process {accelerator.local_process_index}")

lomo_backward

< >

( loss: torch.Tensor learning_rate: float )

在 LOMO 最佳化器上執行反向傳播。

main_process_first

< >

( )

讓主程序先進入 with 塊。

其他程序將在主程序退出後進入 with 塊。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> with accelerator.main_process_first():
...     # This will be printed first by process 0 then in a seemingly
...     # random order by the other processes.
...     print(f"This will be printed by process {accelerator.process_index}")

maybe_context_parallel

< >

( buffers: list[torch.Tensor] | None = None buffer_seq_dims: list[int] | None = None no_restore_buffers: set[torch.Tensor] | None = None )

引數

  • buffers (list[torch.Tensor], 可選) — 將沿序列維度進行分片的緩衝區。常見示例包括輸入、標籤或位置嵌入緩衝區。此上下文管理器將就地修改這些緩衝區,退出上下文後,緩衝區將恢復到其原始狀態。為避免不必要的恢復,您可以使用 no_restore_buffers 指定哪些緩衝區不需要恢復。
  • buffer_seq_dims (list[int], 可選) — buffers 的序列維度。
  • no_restore_buffers (set[torch.Tensor], 可選) — 此集合必須是 buffers 的子集。指定在上下文退出後,buffers 引數中的哪些緩衝區將不會被恢復。這些緩衝區將保持分片狀態。

一個啟用上下文並行訓練的上下文管理器。

context_parallel 目前僅支援與 FSDP2 一起使用,並且需要 parallelism_config.cp_size >

  1. 如果這些條件中的任何一個不滿足,此上下文管理器將沒有效果,但為了減少程式碼更改,它不會引發異常。

此上下文管理器必須在每個訓練步驟中重新建立,如下面的示例所示。

示例

>>> for batch in dataloader:
...     with accelerator.maybe_context_parallel(
...         buffers=[batch["input_ids"], batch["attention_mask"]],
...         buffer_seq_dims=[1, 1],
...         no_restore_buffers={batch["input_ids"]},
...     ):
...         outputs = model(batch)
...         ...

no_sync

< >

( model )

引數

  • model (torch.nn.Module) — 使用 Accelerator.prepare 準備的 PyTorch Module

一個上下文管理器,透過呼叫 torch.nn.parallel.DistributedDataParallel.no_sync 來停用 DDP 程序間的梯度同步。

如果 model 不處於 DDP 模式,此上下文管理器不執行任何操作。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)
>>> input_a = next(iter(dataloader))
>>> input_b = next(iter(dataloader))

>>> with accelerator.no_sync():
...     outputs = model(input_a)
...     loss = loss_func(outputs)
...     accelerator.backward(loss)
...     # No synchronization across processes, only accumulate gradients
>>> outputs = model(input_b)
>>> accelerator.backward(loss)
>>> # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()

on_last_process

< >

( function: Callable[..., Any] )

引數

  • function (Callable) — 要裝飾的函式。

一個裝飾器,它將僅在最後一個程序上執行被裝飾的函式。也可以使用 PartialState 類呼叫。

示例

# Assume we have 4 processes.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_last_process
def print_something():
    print(f"Printed on process {accelerator.process_index}")


print_something()
"Printed on process 3"

on_local_main_process

< >

( function: Callable[..., Any] = None )

引數

  • function (Callable) — 要裝飾的函式。

一個裝飾器,它將僅在本地主程序上執行被裝飾的函式。也可以使用 PartialState 類呼叫。

示例

# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_local_main_process
def print_something():
    print("This will be printed by process 0 only on each server.")


print_something()
# On server 1:
"This will be printed by process 0 only"
# On server 2:
"This will be printed by process 0 only"

on_local_process

< >

( function: Callable[..., Any] = None local_process_index: int = None )

引數

  • function (Callable, 可選) — 要裝飾的函式。
  • local_process_index (int, 可選) — 執行函式的本地程序的索引。

一個裝飾器,它將僅在給定的本地程序索引上執行被裝飾的函式。也可以使用 PartialState 類呼叫。

示例

# Assume we have 2 servers with 4 processes each.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_local_process(local_process_index=2)
def print_something():
    print(f"Printed on process {accelerator.local_process_index}")


print_something()
# On server 1:
"Printed on process 2"
# On server 2:
"Printed on process 2"

on_main_process

< >

( function: Callable[..., Any] = None )

引數

  • function (Callable) — 要裝飾的函式。

一個裝飾器,它將僅在主程序上執行被裝飾的函式。也可以使用 PartialState 類呼叫。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()


>>> @accelerator.on_main_process
... def print_something():
...     print("This will be printed by process 0 only.")


>>> print_something()
"This will be printed by process 0 only"

on_process

< >

( function: Callable[..., Any] = None process_index: int = None )

引數

  • function (Callable, 可選) — 要裝飾的函式。
  • process_index (int, 可選) — 執行函式的程序的索引。

一個裝飾器,它將僅在給定的程序索引上執行被裝飾的函式。也可以使用 PartialState 類呼叫。

示例

# Assume we have 4 processes.
from accelerate import Accelerator

accelerator = Accelerator()


@accelerator.on_process(process_index=2)
def print_something():
    print(f"Printed on process {accelerator.process_index}")


print_something()
"Printed on process 2"

pad_across_processes

< >

( tensor dim = 0 pad_index = 0 pad_first = False ) torch.Tensor 或巢狀的元組/列表/字典的 torch.Tensor

引數

  • tensor (巢狀的列表/元組/字典的 torch.Tensor) — 要收集的資料。
  • dim (int, 可選, 預設為 0) — 要填充的維度。
  • pad_index (int, 可選, 預設為 0) — 用來填充的值。
  • pad_first (bool, 可選, 預設為 False) — 是在開頭還是結尾填充。

返回

torch.Tensor,或巢狀的元組/列表/字典的 torch.Tensor

填充後的張量。

遞迴地將巢狀的列表/元組/字典中的張量從所有裝置填充到相同的大小,以便它們可以安全地被收集。

示例

>>> # Assuming two processes, with the first processes having a tensor of size 1 and the second of size 2
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.process_index + 1).to(accelerator.device)
>>> padded_tensor = accelerator.pad_across_processes(process_tensor)
>>> padded_tensor.shape
torch.Size([2])

prepare

< >

( *args device_placement = None )

引數

  • *args (物件列表) — 以下任何型別的物件:

    • torch.utils.data.DataLoader: PyTorch Dataloader
    • torch.nn.Module: PyTorch Module
    • torch.optim.Optimizer: PyTorch Optimizer
    • torch.optim.lr_scheduler.LRScheduler: PyTorch LR Scheduler
  • device_placement (list[bool], 可選) — 用於自定義是否為每個傳遞的物件執行自動裝置放置。需要是一個與 args 長度相同的列表。與 DeepSpeed 或 FSDP 不相容。

為分散式訓練和混合精度準備 args 中傳遞的所有物件,然後按相同順序返回它們。

如果模型僅用於推理且不使用任何混合精度,則無需準備。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(model, optimizer, data_loader, scheduler)
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume a model, optimizer, data_loader and scheduler are defined
>>> device_placement = [True, True, False, False]
>>> # Will place the first two items passed in automatically to the right device but not the last two.
>>> model, optimizer, data_loader, scheduler = accelerator.prepare(
...     model, optimizer, data_loader, scheduler, device_placement=device_placement
... )

prepare_data_loader

< >

( data_loader: torch.utils.data.DataLoader device_placement = None slice_fn_for_dispatch = None )

引數

  • data_loader (torch.utils.data.DataLoader) — 一個普通的 PyTorch DataLoader,用於準備
  • device_placement (bool, 可選) — 是否在準備好的 dataloader 中將批次放置在正確的裝置上。預設為 self.device_placement
  • slice_fn_for_dispatch (Callable, 可選) -- 如果傳遞,此函式將用於在 num_processes 之間切分張量。預設為 [slice_tensors()](/docs/accelerate/v1.10.0/en/package_reference/utilities#accelerate.utils.slice_tensors)。此引數僅在 dispatch_batches 設定為 True` 時使用,否則將被忽略。

為任何分散式設定中的訓練準備一個 PyTorch DataLoader。建議改用 Accelerator.prepare()

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> data_loader = torch.utils.data.DataLoader(...)
>>> data_loader = accelerator.prepare_data_loader(data_loader, device_placement=True)

prepare_model

< >

( model: torch.nn.Module device_placement: bool = None evaluation_mode: bool = False )

引數

  • model (torch.nn.Module) — 一個 PyTorch 模型,用於準備。如果模型僅用於推理且不使用任何混合精度,則無需準備
  • device_placement (bool, 可選) — 是否將模型放置在正確的裝置上。預設為 self.device_placement
  • evaluation_mode (bool, 可選, 預設為 False) — 是否僅為評估設定模型,只應用混合精度和 torch.compile(如果在 Accelerator 物件中配置)。

為任何分散式設定中的訓練準備一個 PyTorch 模型。建議改用 Accelerator.prepare()

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume a model is defined
>>> model = accelerator.prepare_model(model)

prepare_optimizer

< >

( optimizer: torch.optim.Optimizer device_placement = None )

引數

  • optimizer (torch.optim.Optimizer) — 一個普通的 PyTorch 最佳化器,用於準備
  • device_placement (bool, optional) — 是否將最佳化器放置在正確的裝置上。將預設為 self.device_placement

為任何分散式設定中的訓練準備 PyTorch 最佳化器。建議改用 Accelerator.prepare()

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> optimizer = torch.optim.Adam(...)
>>> optimizer = accelerator.prepare_optimizer(optimizer, device_placement=True)

prepare_scheduler

< >

( scheduler: LRScheduler )

引數

  • scheduler (torch.optim.lr_scheduler.LRScheduler) — 一個待準備的原生 PyTorch 排程器

為任何分散式設定中的訓練準備 PyTorch 排程器。建議改用 Accelerator.prepare()

示例

>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> optimizer = torch.optim.Adam(...)
>>> scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, ...)
>>> scheduler = accelerator.prepare_scheduler(scheduler)

print

< >

( *args **kwargs )

print() 的直接替代品,每臺伺服器只打印一次。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> accelerator.print("Hello world!")

profile

< >

( profile_handler: ProfileKwargs | None = None )

引數

  • profile_handler (ProfileKwargs, optional) — 用於此上下文管理器的效能分析處理器。如果未傳入,將使用 Accelerator 物件中設定的處理器。

將對上下文管理器內的程式碼進行效能分析。如果設定了 profile_handler.output_trace_dir,效能分析結果將儲存到 Chrome Trace 檔案中。

可以傳入一個不同的 profile_handler 來覆蓋 Accelerator 物件中設定的處理器。

示例

# Profile with default settings
from accelerate import Accelerator
from accelerate.utils import ProfileKwargs

accelerator = Accelerator()
with accelerator.profile() as prof:
    train()
accelerator.print(prof.key_averages().table())


# Profile with the custom handler
def custom_handler(prof):
    print(prof.key_averages().table(sort_by="self_cpu_time_total", row_limit=10))


kwargs = ProfileKwargs(schedule_option=dict(wait=1, warmup=1, active=1), on_trace_ready=custom_handler)
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile() as prof:
    for _ in range(10):
        train_iteration()
        prof.step()


# Profile and export to Chrome Trace
kwargs = ProfileKwargs(output_trace_dir="output_trace")
accelerator = Accelerator(kwarg_handler=[kwargs])
with accelerator.profile():
    train()

reduce

< >

( tensor reduction = 'sum' scale = 1.0 ) torch.Tensor,或 torch.Tensor 的巢狀元組/列表/字典

引數

  • tensor (torch.Tensortorch.Tensor 的巢狀元組/列表/字典) — 需要在所有程序間進行規約的張量。
  • reduction (str, optional, 預設為 “sum”) — 規約型別,可以是 ‘sum’、‘mean’ 或 ‘none’ 之一。如果為 ‘none’,將不執行任何操作。
  • scale (float, optional, 預設為 1.0) — 規約後應用的預設縮放值,僅在 XLA 上有效。

返回

torch.Tensor,或巢狀的元組/列表/字典的 torch.Tensor

規約後的張量。

根據 reduction 在所有程序間規約 tensor 中的值。

注意:所有程序都會獲得規約後的值。

示例

>>> # Assuming two processes
>>> import torch
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> process_tensor = torch.arange(accelerator.num_processes) + 1 + (2 * accelerator.process_index)
>>> process_tensor = process_tensor.to(accelerator.device)
>>> reduced_tensor = accelerator.reduce(process_tensor, reduction="sum")
>>> reduced_tensor
tensor([4, 6])

register_for_checkpointing

< >

( *objects )

記錄 objects,並在 save_stateload_state 期間儲存或載入它們。

當在同一指令碼中載入或儲存狀態時應使用此功能。它不適用於在不同指令碼中使用。

每個 object 必須具有 load_state_dictstate_dict 函式才能被儲存。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume `CustomObject` has a `state_dict` and `load_state_dict` function.
>>> obj = CustomObject()
>>> accelerator.register_for_checkpointing(obj)
>>> accelerator.save_state("checkpoint.pt")

register_load_state_pre_hook

< >

( hook: Callable[..., None] ) torch.utils.hooks.RemovableHandle

引數

返回

torch.utils.hooks.RemovableHandle

一個控制代碼,可以透過呼叫 `handle.remove()` 來移除新增的鉤子

註冊一個預處理鉤子,在 Accelerator.load_state() 中呼叫 load_checkpoint 之前執行。

鉤子應具有以下簽名:

hook(models: list[torch.nn.Module], input_dir: str) -> None

models 引數是儲存在加速器狀態 accelerator._models 下的模型,input_dir 引數是傳遞給 Accelerator.load_state()input_dir 引數。

應僅與 Accelerator.register_save_state_pre_hook() 結合使用。這對於載入模型權重之外的配置很有用。也可用於使用自定義方法覆蓋模型載入。在這種情況下,請確保從模型列表中移除已載入的模型。

register_save_state_pre_hook

< >

( hook: Callable[..., None] ) torch.utils.hooks.RemovableHandle

引數

返回

torch.utils.hooks.RemovableHandle

一個控制代碼,可以透過呼叫 `handle.remove()` 來移除新增的鉤子

註冊一個預處理鉤子,在 Accelerator.save_state() 中呼叫 save_checkpoint 之前執行。

鉤子應具有以下簽名:

hook(models: list[torch.nn.Module], weights: list[dict[str, torch.Tensor]], input_dir: str) -> None

models 引數是儲存在加速器狀態 accelerator._models 下的模型,weights 引數是 models 的狀態字典,而 input_dir 引數是傳遞給 Accelerator.load_state()input_dir 引數。

應僅與 Accelerator.register_load_state_pre_hook() 結合使用。這對於儲存模型權重之外的配置很有用。也可用於使用自定義方法覆蓋模型儲存。在這種情況下,請確保從權重列表中移除已載入的權重。

save

< >

( obj f safe_serialization = False )

引數

  • obj (object) — 要儲存的物件。
  • f (str or os.PathLike) — 儲存 obj 內容的位置。
  • safe_serialization (bool, optional, 預設為 False) — 是否使用 safetensors 儲存 obj

每臺機器只將傳遞的物件儲存到磁碟一次。用於替代 torch.save

注意:如果在 ProjectConfiguration 中傳入了 save_on_each_node,將在每個節點上儲存一次物件,而不僅僅是在主節點上儲存一次。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> arr = [0, 1, 2, 3]
>>> accelerator.save(arr, "array.pkl")

save_model

< >

( model: torch.nn.Module save_directory: Union[str, os.PathLike] max_shard_size: Union[int, str] = '10GB' safe_serialization: bool = True )

引數

  • model — (torch.nn.Module):要儲存的模型。模型可以是包裝過的或未包裝的。
  • save_directory (stros.PathLike) — 要儲存到的目錄。如果不存在,將會建立。
  • max_shard_size (int or str, optional, 預設為 "10GB") — 檢查點在分片前的最大大小。檢查點分片後的大小將小於此值。如果以字串表示,需要是數字後跟單位(如 "5MB")。

    如果模型的單個權重比 max_shard_size 大,它將被放在自己的檢查點分片中,該分片將大於 max_shard_size

  • safe_serialization (bool, optional, 預設為 True) — 是使用 safetensors 還是傳統的 PyTorch 方式(使用 pickle)來儲存模型。

儲存模型,以便可以使用 load_checkpoint_in_model 重新載入

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model = ...
>>> accelerator.save_model(model, save_directory)

save_state

< >

( output_dir: str = None safe_serialization: bool = True **save_model_func_kwargs )

引數

  • output_dir (stros.PathLike) — 用於儲存所有相關權重和狀態的資料夾名稱。
  • safe_serialization (bool, optional, 預設為 True) — 是使用 safetensors 還是傳統的 PyTorch 方式(使用 pickle)來儲存模型。
  • save_model_func_kwargs (dict, optional) — 用於儲存模型的額外關鍵字引數,可以傳遞給底層的儲存函式,例如 DeepSpeed 的 save_checkpoint 函式的可選引數。

將模型、最佳化器、縮放器、RNG 生成器和已註冊物件的當前狀態儲存到一個資料夾中。

如果向 Accelerator 物件傳遞了啟用了 automatic_checkpoint_namingProjectConfiguration,則檢查點將儲存到 self.project_dir/checkpoints。如果當前儲存的數量大於 total_limit,則會刪除最舊的儲存。每個檢查點儲存在名為 checkpoint_<iteration> 的獨立資料夾中。

否則,它們只會被儲存到 output_dir

僅當希望在訓練期間儲存檢查點並在相同環境中恢復狀態時使用。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer, lr_scheduler = ...
>>> model, optimizer, lr_scheduler = accelerator.prepare(model, optimizer, lr_scheduler)
>>> accelerator.save_state(output_dir="my_checkpoint")

set_trigger

< >

( )

在當前程序上將內部觸發張量設定為 1。後續應使用此張量進行檢查,該檢查將在所有程序間進行。

注意:不需要 wait_for_everyone()

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> # Assume later in the training script
>>> # `should_do_breakpoint` is a custom function to monitor when to break,
>>> # e.g. when the loss is NaN
>>> if should_do_breakpoint(loss):
...     accelerator.set_trigger()
>>> # Assume later in the training script
>>> if accelerator.check_breakpoint():
...     break

skip_first_batches

< >

( dataloader num_batches: int = 0 )

引數

  • dataloader (torch.utils.data.DataLoader) — 要跳過批次的資料載入器。
  • num_batches (int, optional, 預設為 0) — 要跳過的批次數

建立一個新的 torch.utils.data.DataLoader,它將高效地跳過前 num_batches 個批次。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
>>> skipped_dataloader = accelerator.skip_first_batches(dataloader, num_batches=2)
>>> # for the first epoch only
>>> for input, target in skipped_dataloader:
...     optimizer.zero_grad()
...     output = model(input)
...     loss = loss_func(output, target)
...     accelerator.backward(loss)
...     optimizer.step()

>>> # subsequent epochs
>>> for input, target in dataloader:
...     optimizer.zero_grad()
...     ...

split_between_processes

< >

( inputs: list | tuple | dict | torch.Tensor apply_padding: bool = False )

引數

  • inputs (listtupletorch.Tensorlist/tuple/torch.Tensordict) — 要在程序間分割的輸入。
  • apply_padding (bool, optional, 預設為 False) — 是否透過重複輸入的最後一個元素來應用填充,以使所有程序具有相同數量的元素。這在嘗試對輸出執行 Accelerator.gather() 等操作或傳入的輸入少於程序數時非常有用。如果使用,只需記得之後丟棄填充的元素。

self.num_processes 之間快速分割 input,然後可以在該程序上使用。在進行分散式推理(例如使用不同提示)時非常有用。

注意,當使用 dict 時,所有鍵都需要有相同數量的元素。

示例

# Assume there are two processes
from accelerate import Accelerator

accelerator = Accelerator()
with accelerator.split_between_processes(["A", "B", "C"]) as inputs:
    print(inputs)
# Process 0
["A", "B"]
# Process 1
["C"]

with accelerator.split_between_processes(["A", "B", "C"], apply_padding=True) as inputs:
    print(inputs)
# Process 0
["A", "B"]
# Process 1
["C", "C"]

trigger_sync_in_backward

< >

( model )

引數

  • model (torch.nn.Module) — 需要觸發梯度同步的模型。

在 `Accelerator.no_sync` 下多次前向傳播後,在模型的下一次反向傳播中觸發梯度同步(僅適用於多 GPU 場景)。

如果指令碼不是在分散式模式下啟動,此上下文管理器不執行任何操作。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> dataloader, model, optimizer = accelerator.prepare(dataloader, model, optimizer)

>>> with accelerator.no_sync():
...     loss_a = loss_func(model(input_a))  # first forward pass
...     loss_b = loss_func(model(input_b))  # second forward pass
>>> accelerator.backward(loss_a)  # No synchronization across processes, only accumulate gradients
>>> with accelerator.trigger_sync_in_backward(model):
...     accelerator.backward(loss_b)  # Synchronization across all processes
>>> optimizer.step()
>>> optimizer.zero_grad()

unscale_gradients

< >

( optimizer = None )

引數

  • optimizer (torch.optim.Optimizerlist[torch.optim.Optimizer], optional) — 需要取消梯度縮放的最佳化器。如果未設定,將對所有傳遞給 prepare() 的最佳化器取消梯度縮放。

在 AMP 混合精度訓練中取消梯度縮放。在所有其他設定中,這是一個空操作。

可能應透過 Accelerator.clip_grad_norm_()Accelerator.clip_grad_value_() 呼叫。

示例

>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model, optimizer = accelerator.prepare(model, optimizer)
>>> outputs = model(inputs)
>>> loss = loss_fn(outputs, labels)
>>> accelerator.backward(loss)
>>> accelerator.unscale_gradients(optimizer=optimizer)

unwrap_model

< >

( model keep_fp32_wrapper: bool = True keep_torch_compile: bool = True ) torch.nn.Module

引數

  • model (torch.nn.Module) — 要解包的模型。
  • keep_fp32_wrapper (bool, optional, 預設為 True) — 如果添加了混合精度鉤子,是否不移除它。
  • keep_torch_compile (bool, optional, 預設為 True) — 如果模型已編譯,是否不解包編譯後的模型。

返回

torch.nn.Module

解包後的模型。

prepare() 可能新增的額外層中解包 model。在儲存模型之前很有用。

示例

>>> # Assuming two GPU processes
>>> from torch.nn.parallel import DistributedDataParallel
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> model = accelerator.prepare(MyModel())
>>> print(model.__class__.__name__)
DistributedDataParallel

>>> model = accelerator.unwrap_model(model)
>>> print(model.__class__.__name__)
MyModel

verify_device_map

< >

( model: torch.nn.Module )

驗證 model 未使用類似於 auto 的裝置對映進行大模型推理的準備。

wait_for_everyone

< >

( )

將停止當前程序的執行,直到所有其他程序都到達該點(因此當指令碼僅在一個程序中執行時,此操作無效)。在儲存模型之前很有用。

示例

>>> # Assuming two GPU processes
>>> import time
>>> from accelerate import Accelerator

>>> accelerator = Accelerator()
>>> if accelerator.is_main_process:
...     time.sleep(2)
>>> else:
...     print("I'm waiting for the main process to finish its sleep...")
>>> accelerator.wait_for_everyone()
>>> # Should print on every process at the same time
>>> print("Everyone is here")

實用工具

accelerate.utils.gather_object

< >

( object: typing.Any )

引數

  • object (可序列化物件的巢狀列表/元組/字典) — 要收集的資料。

從所有裝置遞迴地收集巢狀列表/元組/字典中的物件。

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.