Transformers 文件

Transformer 中的張量並行

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

Transformer 中的張量並行

張量並行將模型分片到多個 GPU 上,並並行化矩陣乘法等計算。它使更大的模型能夠適應記憶體,並且速度更快,因為每個 GPU 都可以處理一個張量切片。本文件假定您已經熟悉張量並行的基礎知識。如果還不熟悉,請參閱超大規模手冊中關於張量並行的部分。

張量並行是通訊密集型的,因此建議在具有多個 GPU 的單臺機器上使用它,利用快速的節點內通訊。對於多節點訓練,流水線或資料並行方法更高效(取決於您的用例)。

張量並行需要對模型引數進行微小的更改,因此在 transformers 中,我們開箱即用地支援一些流行的模型。

展開下面的列表,檢視支援張量並行的模型。如果模型目前不在列表中,請提交 GitHub 問題或拉取請求以新增支援。

支援的模型

使用 🤗 transformers

Transformers 提供了一個簡單的介面用於張量並行。我們提供多個類來實現不同的分割槽策略,以及一個簡單的入口點來並行化 nn.Module 例項。您不必直接與此介面互動,一切都在 PretrainedModel.from_pretrained 方法中為您完成。本節將首先討論我們支援的分割槽策略,然後是您將與之互動的使用者介面,最後將教您如何用您自己的分割槽策略擴充套件它。

分割槽策略

在 transformers 中,分割槽策略存在於 ParallelInterface 類中,它就像字串到策略實現的對映。

class ParallelInterface(MutableMapping):
    """
    Dict-like object keeping track of allowed attention functions. You can easily add a new attention function
    with a call to `register()`. If a model needs to locally overwrite an existing attention function, say `sdpa`,
    it needs to declare a new instance of this class inside the `modeling_<model>.py`, and declare it on that instance.
    """
    _global_mapping = {
        "colwise": ColwiseParallel(),
        "rowwise": RowwiseParallel(),
        "colwise_rep": ColwiseParallel(output_layouts=Replicate()),
        "rowwise_rep": RowwiseParallel(input_layouts=Replicate()),
        "local_colwise": ColwiseParallel(use_dtensor=False),
        "local_rowwise": RowwiseParallel(use_dtensor=False),
        "local": IsolatedParallel(),
        "gather": GatherParallel(),
        "local_packed_rowwise": PackedRowwiseParallel(use_dtensor=False),
        "sequence_parallel": SequenceParallel(),
        "replicate": ReplicateParallel(),
    }

我們支援以下策略:

  • ColwiseParallel - 簡單的列式分割槽,能夠同時處理權重和偏差,功能與我們之前討論的完全一致。
  • RowwiseParallel - 再次,行式分割槽,如前所述,支援權重和偏差,此外還支援 nn.Embedding 模組。
  • SequenceParallel - 序列並行實現,用於支援 LayerNormDropout 層。還支援 RMSNorm 的 Python 實現(參見此處
  • PackedColwiseParallel - 列式分割槽的一種變體,但它作用於打包的權重(即 up_projgate_proj 打包在一起)。有關更多詳細資訊,請參閱此註釋
  • PackedRowwiseParallel - 行式分割槽的一種變體,作用於打包的權重,有關更多詳細資訊請檢視上面連結的註釋。
  • GatherParallel - 一個非常簡單的類,只將模組的輸出在裝置之間進行收集。
  • IsolatedParallel - 這是一個特殊情況,我們希望將模組與其餘裝置(世界)隔離。這用於 MoE 層中的專家,基本上建立了一種專家並行性。
  • ReplicateParallel - 許多 torch.distributed API 在模型部分分片時會中斷,因此此類別用於在所有裝置上覆制模組。

模型分片

我們提供兩種分片模型的方式,第一種是使用 auto 張量並行計劃,它將根據我們預定義的配置自動分片模型。這需要模型在 transformers 中具有預定義的張量並行計劃。

from transformers import AutoModelForCausalLM

# model_id = "meta-llama/Meta-Llama-3-8B-Instruct" # better for smaller number of GPUs
model_id = "meta-llama/Llama-4-Scout-17B-16E-Instruct" # better to visualize all the possible strategies

model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.bfloat16, tp_plan="auto")

print(model._tp_plan)

有關支援張量並行的模型列表,請參閱上面的支援的模型部分。

第二種方法是手動指定您自己的分割槽計劃。

from transformers import AutoModelForCausalLM

tp_plan = {
    "model.layers.*.self_attn.q_proj": "colwise",
    "model.layers.*.self_attn.k_proj": "colwise",
    "model.layers.*.self_attn.v_proj": "colwise",
    "model.layers.*.self_attn.o_proj": "rowwise",
    ...
}

model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.bfloat16, tp_plan=tp_plan)

print(model._tp_plan)

您可能已經注意到 ParallelInterface 對映中存在一些特殊情況,現在讓我們來討論它們。這將幫助您理解它們的用途,並有助於擴充套件到其他策略。

PackedRowwiseParallel

這個類是 RowwiseParallel 的一個特例,它用於分片打包的權重。權重打包是模型中常用的一種技術。它是一種將多個線性層打包成一個更大的層的方法。

例如,在 Llama4 模型中,我們將 up_projgate_proj 打包到一個 gate_up_proj 模組中。

class Llama4TextExperts(nn.Module):
    ...
    self.gate_up_proj = nn.Parameter(torch.empty(self.num_experts, self.hidden_size, 2 * self.expert_dim))

然後在前向傳播中,我們可以使用批次矩陣乘法來計算 gate_up_proj 模組的輸出。

def forward(self, hidden_states):
    ...
    gate_up = torch.bmm(hidden_states, self.gate_up_proj) # Compute the output of the gate_up_proj module
    gate, up = gate_up.chunk(2, dim=-1) # Split the output into gate and up

在這種情況下,我們需要使用 PackedRowwiseParallel 策略來分片 gate_up_proj 模組,因為使用簡單的 RowwiseParallel 將錯誤地分片層。

如果這有點難以理解,請檢視此註釋,其中提供了為什麼需要使用 Packed* 的精彩視覺表示。

local* 策略

您可能已經注意到存在 local* 策略,它們使用與 * 策略相同的層,但根本不使用 DTensor。這是因為 DTensor 不支援某些操作,例如 torch.chunk。因此,有時我們需要使用 local* 策略,它們使用普通的 torch.Tensor 並手動完成一些分散式邏輯。

手動指定您自己的分割槽計劃需要對模型架構以及分割槽策略如何相互作用有很好的理解。如果您對此不確定,最終的模型可能會非常慢,甚至出現故障或不正確。再次,請參閱超大規模手冊,它可以教您所需的一切。

使用您自己的分割槽策略擴充套件介面

這是一個非常高階的話題,需要對分散式集合和模型架構有很好的理解。您的自定義分割槽策略應該繼承自integrations/tensor_parallel.py中定義的 TensorParallelLayer,並實現:partition_tensor_prepare_input_fn_prepare_output_fn。然後,它應該在 ParallelInterface 對映中註冊,以便我們的排程邏輯在 tp_plan 中指定時可以找到它。

讓我們以一個現有示例 ColwiseParallel 為例,逐步講解此工作流程。

  1. 繼承自 TensorParallelLayer 並進行初始化
class ColwiseParallel(TensorParallelLayer):
    def __init__(
        self,
        *,
        input_layouts: Optional[Placement] = None, # The input layout coming from the previous layer
        output_layouts: Optional[Placement] = None, # The output layout we want to achieve
        use_local_output: bool = True, # Whether to use local output or not
        use_dtensor=True, # Whether to use DTensor or not
    ):
        self.input_layouts = (input_layouts or Replicate(),) # The input sharding coming from the previous layer
        self.output_layouts = (output_layouts or Shard(-1),) # Desired output sharding
        self.desired_input_layouts = (Replicate(),) # Desired input sharding, inputs should be replicated across GPUs
        self.use_local_output = use_local_output
        self.use_dtensor = use_dtensor

__init__ 方法中,我們定義這些屬性,其中 input_layoutsoutput_layouts 描述了輸入和輸出張量應如何放置在裝置上。desired_input_layouts 用於指定輸入應該如何放置在裝置上。

2a. 實現 partition_tensor 方法

def partition_tensor(
    self,
    param, # Full tensor of the parameter
    empty_param, # Empty tensor of the parameter, will be filled with the partitioned tensor
    param_type, # Type of the parameter, `bias` or `weight`
    param_casting_dtype, # The type to cast the parameter to
    to_contiguous, # Whether to convert the tensor to a contiguous memory layout
    rank, # The rank of the current device
    device_mesh, # The device mesh
) -> nn.Parameter: # Return the partitioned parameter
    ...

此方法用於對張量進行分割槽,並用分割槽後的張量填充 empty_param。我們提供了一些實用函式來幫助您完成此操作,例如 get_tensor_shard,它將為您獲取此 rank 的原始引數的正確分片,或 get_packed_weights 以幫助處理打包權重。

2b. 實現 _prepare_input_fn_prepare_output_fn 方法

這些方法分別用作 pre-forwardforward 鉤子。它們的作用是將輸入和輸出重新分配到 __init__ 方法中傳入的所需佈局。

def _prepare_input_fn(input_layouts, desired_input_layouts, mod, inputs, device_mesh):
    ...
    # Do some custom logic, cast to DTensor etc.
    ...
    return inputs.redistribute(placements=desired_input_layouts, device_mesh=device_mesh)

def _prepare_output_fn(output_layouts, use_local_output, mod, outputs, device_mesh):
    ...
    # Do some custom logic, cast to DTensor etc.
    ...
    return outputs.redistribute(placements=output_layouts, device_mesh=device_mesh)
  1. 註冊策略 恭喜!您已經實現了自己的分割槽策略。現在,要將其與您自己的 tp_plan 一起使用,您需要將其註冊到 ParallelInterface 對映中。
from transformers.integrations.tensor_parallel import ParallelInterface

ParallelInterface.register_strategy("colwise_custom", ColwiseParallel)

現在您可以在 tp_plan 中使用它,如下所示:

tp_plan = {
    "model.layers.*.self_attn.q_proj": "colwise_custom",
    ...
}

model = AutoModelForCausalLM.from_pretrained(model_id, torch_dtype=torch.bfloat16, tp_plan=tp_plan)

完整示例

讓我們透過一個完整的張量並行推理示例。

import os
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer


# enable tensor parallelism
model = AutoModelForCausalLM.from_pretrained(
    "meta-llama/Meta-Llama-3-8B-Instruct",
    tp_plan="auto",
)

# prepare input tokens
tokenizer = AutoTokenizer.from_pretrained("meta-llama/Meta-Llama-3-8B-Instruct")
prompt = "Can I help"
inputs = tokenizer(prompt, return_tensors="pt").input_ids.to(model.device)

# distributed run
outputs = model(inputs)

torchrun 上啟動上述推理指令碼,每個 GPU 執行 4 個程序。

torchrun --nproc-per-node 4 demo.py

您可以從推理中獲得顯著的加速,特別是對於批處理大小或序列長度較大的輸入。

對於序列長度為 512 且批處理大小不同的 Llama 的單個前向傳播,您可以預期以下加速。

深入瞭解張量並行

我們的張量並行實現設計上是與框架無關的,但我們開發的具體實現依賴於 torch.distributed 包。我們大量利用 DeviceMeshDTensor 等抽象來為使用者提供簡單且可擴充套件的介面。

DeviceMesh

DeviceMesh 想象成一個相互通訊的多維裝置網格。不同的並行化策略需要不同型別的通訊模式,因此我們可以建立具有多個子網格的 DeviceMesh

from torch.distributed.device_mesh import init_device_mesh

# Create a 1D mesh of 4 GPUs
device_mesh = init_device_mesh("cuda", (4,), mesh_dim_names=["tp"])

然後,大多數 torch.distributed 定義的並行化策略都可以應用於網格本身或其子網格,從而自動處理通訊模式。

DTensor

分散式張量(Distributed Tensor)的縮寫,DTensor 是一種張量子類,它在常規張量操作之上處理分散式邏輯。在張量並行的情況下,大多數模型權重都儲存為 DTensor(有一些例外,稍後詳述)。DTensor 最重要且必須理解的部分是 placement 屬性。此屬性告訴 PyTorch 張量如何放置在 DeviceMesh 的裝置上。

它可以具有以下值:

  • Shard(dimension) - 註釋此 DTensor 在其構建的 DeviceMesh 上按給定維度進行分片。例如,如果我們想對列式分割槽進行權重分片,我們會這樣做:
weight = ...
weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(0)]) # Shard across the 1st (column-wise) dimension
bias = ...
bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Shard(-1)]) # Shard across the ONLY dimension

再舉一個例子,對於行式分割槽,我們會這樣做:

weight = ...
weight = DTensor.from_local(weight, device_mesh["tp"], placements=[Shard(1)]) # Shard across the 2nd (row-wise) dimension
bias = ...
bias = DTensor.from_local(bias, device_mesh["tp"], placements=[Replicate()]) # Replicate bias across all GPUs
  • Replicate() - 註釋此 DTensorDeviceMesh 上被複制。非常直接,只在每個裝置上建立張量的完整副本。
  • Partial() - 此 placement 對我們來說大部分無關緊要,它用於註釋此張量正在等待歸約操作。
< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.