Accelerate 文件

分散式推理

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

分散式推理

分散式推理可以分為三類

  1. 將整個模型載入到每個 GPU 上,然後一次性將一個批次的資料塊傳送到每個 GPU 的模型副本中
  2. 將模型的不同部分載入到每個 GPU 上,一次處理一個輸入
  3. 將模型的不同部分載入到每個 GPU 上,並使用所謂的排程流水線並行來結合前兩種技術。

我們將介紹第一類和最後一類,展示如何實現它們,因為它們是更現實的場景。

自動將批次資料塊傳送到每個載入的模型

這是最耗費記憶體的解決方案,因為它要求每個 GPU 在任何給定時間都在記憶體中保留模型的完整副本。

通常在這樣做時,使用者會將模型傳送到特定裝置以從 CPU 載入,然後將每個提示移動到不同的裝置。

使用 diffusers 庫的基本流程可能如下所示:

import torch
import torch.distributed as dist
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)

然後根據具體的提示進行推理

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    pipe.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    result = pipe(prompt).images[0]
    result.save(f"result_{rank}.png")

人們會注意到,我們必須檢查程序的排名(rank)才能知道傳送哪個提示,這可能有點繁瑣。

使用者可能還會認為,使用 Accelerate 中的 Accelerator 為這樣的任務準備一個 dataloader 也是一種簡單的管理方式。(要了解更多資訊,請檢視快速入門中的相關部分)

它能處理嗎?是的。但它是否會新增不必要的額外程式碼:也是的。

使用 Accelerate,我們可以透過使用 Accelerator.split_between_processes() 上下文管理器來簡化這個過程(這個管理器也存在於 PartialStateAcceleratorState 中)。這個函式會自動將您傳遞給它的任何資料(無論是提示、一組張量、還是包含前述資料的字典等)在所有程序中進行拆分(可能會進行填充),以便您立即使用。

讓我們用這個上下文管理器重寫上面的例子

import torch
from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipe(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

然後要啟動程式碼,我們可以使用 Accelerate

如果您已經使用 accelerate config 生成了要使用的配置檔案

accelerate launch distributed_inference.py

如果您想使用特定的配置檔案

accelerate launch --config_file my_config.json distributed_inference.py

或者如果您不想建立任何配置檔案,只想在兩個 GPU 上啟動

注意:您會收到一些關於根據您的系統猜測值的警告。要消除這些警告,您可以執行 accelerate config default 或透過 accelerate config 來建立配置檔案。

accelerate launch --num_processes 2 distributed_inference.py

我們現在已經將拆分這些資料所需的樣板程式碼輕鬆地減少到了幾行。

但是,如果我們有一個提示與 GPU 的奇數分佈呢?例如,如果我們有 3 個提示,但只有 2 個 GPU 怎麼辦?

在上下文管理器下,第一個 GPU 將接收前兩個提示,第二個 GPU 將接收第三個,確保所有提示都被拆分,且不需要額外開銷。

然而,如果我們想對所有 GPU 的結果進行某些操作呢?(比如將它們全部收集起來並進行某種後處理)您可以傳入 apply_padding=True 來確保提示列表被填充到相同的長度,額外的資料取自最後一個樣本。這樣所有 GPU 都將擁有相同數量的提示,然後您就可以收集結果了。

這僅在嘗試執行諸如收集結果之類的操作時才需要,因為每個裝置上的資料需要具有相同的長度。基本的推理不需要這個。

例如

import torch
from accelerate import PartialState  # Can also be Accelerator or AcceleratorState
from diffusers import DiffusionPipeline

pipe = DiffusionPipeline.from_pretrained("runwayml/stable-diffusion-v1-5", torch_dtype=torch.float16)
distributed_state = PartialState()
pipe.to(distributed_state.device)

# Assume two processes
with distributed_state.split_between_processes(["a dog", "a cat", "a chicken"], apply_padding=True) as prompt:
    result = pipe(prompt).images

在第一個 GPU 上,提示將是 ["a dog", "a cat"],而在第二個 GPU 上,它將是 ["a chicken", "a chicken"]。請確保丟棄最後一個樣本,因為它將是前一個樣本的副本。

您可以在這裡找到更復雜的示例,例如如何將其與大語言模型(LLM)一起使用。

記憶體高效的流水線並行(實驗性)

下一部分將討論使用流水線並行。這是一個實驗性 API,它利用了 torch.distributed.pipelining 作為原生解決方案。

流水線並行的一般思想是:假設你有 4 個 GPU 和一個足夠大的模型,可以使用 device_map="auto" 將其分割到四個 GPU 上。透過這種方法,你可以一次傳送 4 個輸入(例如這裡,任何數量都可以),每個模型塊將處理一個輸入,然後在前一個塊完成後接收下一個輸入,使其比之前描述的方法效率高得多 並且速度更快。這裡有一個來自 PyTorch 倉庫的視覺化圖示

Pipeline parallelism example

為了說明如何將此方法與 Accelerate 結合使用,我們建立了一個示例集合,展示了許多不同的模型和情況。在本教程中,我們將展示在兩個 GPU 上對 GPT2 使用此方法。

在繼續之前,請確保您已安裝最新的 PyTorch 版本,執行以下命令:

pip install torch

首先在 CPU 上建立模型

from transformers import GPT2ForSequenceClassification, GPT2Config

config = GPT2Config()
model = GPT2ForSequenceClassification(config)
model.eval()

接下來,你需要建立一些示例輸入來使用。這些可以幫助 torch.distributed.pipelining 追蹤模型。

你建立這個示例的方式將決定在給定時間內將被使用/傳遞給模型的相對批次大小,所以請務必記住有多少個專案!
input = torch.randint(
    low=0,
    high=config.vocab_size,
    size=(2, 1024),  # bs x seq_len
    device="cpu",
    dtype=torch.int64,
    requires_grad=False,
)

接下來,我們需要實際執行追蹤並準備好模型。為此,請使用 inference.prepare_pippy() 函式,它會自動將模型完全包裝起來以實現流水線並行。

from accelerate.inference import prepare_pippy
example_inputs = {"input_ids": input}
model = prepare_pippy(model, example_args=(input,))

您可以向 prepare_pippy 傳遞多種引數

  • split_points 允許您確定在哪些層分割模型。預設情況下,我們使用 device_map="auto" 宣告的位置,例如 fcconv1

  • num_chunks 決定了批次將如何被分割併發送到模型本身(因此,num_chunks=1 加上四個分割點/四個 GPU 將會有一個樸素的模型並行,其中單個輸入在四個層分割點之間傳遞)

從這裡開始,剩下的就是實際執行分散式推理了!

在傳遞輸入時,我們強烈建議將它們作為引數元組傳遞。雖然支援使用 kwargs,但這種方法是實驗性的。

args = some_more_arguments
with torch.no_grad():
    output = model(*args)

完成後,所有資料將只在最後一個程序上

from accelerate import PartialState
if PartialState().is_last_process:
    print(output)

如果您將 gather_output=True 傳遞給 inference.prepare_pippy(),輸出將在之後傳送到所有 GPU,而無需進行 is_last_process 檢查。此引數預設為 False,因為它會產生一次通訊呼叫。

就是這樣!要探索更多內容,請檢視 Accelerate 倉庫中的推理示例和我們的文件,我們將繼續努力改進這一整合。

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.