Diffusers 文件

分散式推理

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

分散式推理

在分散式設定中,您可以使用 🤗 AcceleratePyTorch Distributed 在多個GPU上執行推理,這對於並行生成多個提示非常有用。

本指南將向您展示如何使用 🤗 Accelerate 和 PyTorch Distributed 進行分散式推理。

🤗 Accelerate

🤗 Accelerate 是一個旨在簡化跨分散式設定訓練或執行推理的庫。它簡化了分散式環境的設定過程,讓您可以專注於PyTorch程式碼。

首先,建立一個Python檔案並初始化一個accelerate.PartialState 來建立分散式環境;您的設定會自動檢測,因此您無需顯式定義 rankworld_size。將 DiffusionPipeline 移動到 distributed_state.device 以將GPU分配給每個程序。

現在使用 split_between_processes 工具作為上下文管理器,自動在程序之間分配提示。

import torch
from accelerate import PartialState
from diffusers import DiffusionPipeline

pipeline = DiffusionPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)
distributed_state = PartialState()
pipeline.to(distributed_state.device)

with distributed_state.split_between_processes(["a dog", "a cat"]) as prompt:
    result = pipeline(prompt).images[0]
    result.save(f"result_{distributed_state.process_index}.png")

使用 --num_processes 引數指定要使用的GPU數量,並呼叫 accelerate launch 執行指令碼

accelerate launch run_distributed.py --num_processes=2

有關在多個GPU上執行推理的最小示例 指令碼,請參閱此內容。要了解更多資訊,請檢視 使用 🤗 Accelerate 進行分散式推理 指南。

PyTorch Distributed

PyTorch支援DistributedDataParallel,該功能支援資料並行。

首先,建立一個Python檔案並匯入torch.distributedtorch.multiprocessing來設定分散式程序組,併為每個GPU上的推理生成程序。您還應該初始化一個DiffusionPipeline

import torch
import torch.distributed as dist
import torch.multiprocessing as mp

from diffusers import DiffusionPipeline

sd = DiffusionPipeline.from_pretrained(
    "stable-diffusion-v1-5/stable-diffusion-v1-5", torch_dtype=torch.float16, use_safetensors=True
)

您需要建立一個函式來執行推理;init_process_group負責建立一個分散式環境,包含要使用的後端型別、當前程序的rank以及world_size(參與程序的數量)。如果您在2個GPU上並行執行推理,那麼world_size為2。

DiffusionPipeline 移動到 rank,並使用 get_rank 將一個 GPU 分配給每個程序,其中每個程序處理不同的提示

def run_inference(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

    sd.to(rank)

    if torch.distributed.get_rank() == 0:
        prompt = "a dog"
    elif torch.distributed.get_rank() == 1:
        prompt = "a cat"

    image = sd(prompt).images[0]
    image.save(f"./{'_'.join(prompt)}.png")

要執行分散式推理,請呼叫 mp.spawn,以便在 world_size 中定義的 GPU 數量上執行 run_inference 函式

def main():
    world_size = 2
    mp.spawn(run_inference, args=(world_size,), nprocs=world_size, join=True)


if __name__ == "__main__":
    main()

完成推理指令碼後,使用 --nproc_per_node 引數指定要使用的 GPU 數量,並呼叫 torchrun 執行指令碼

torchrun run_distributed.py --nproc_per_node=2

您可以在 DiffusionPipeline 中使用 device_map 來在多個裝置上分散式其模型級元件。請參閱 裝置放置 指南以瞭解更多資訊。

模型分片

現代擴散系統,例如Flux,非常龐大且包含多個模型。例如,Flux.1-Dev由兩個文字編碼器——T5-XXLCLIP-L——一個擴散Transformer和一個VAE組成。對於如此大的模型,在消費級GPU上執行推理可能具有挑戰性。

模型分片是一種技術,當模型無法適應單個 GPU 時,它將模型分佈在多個 GPU 上。以下示例假設有兩個 16GB 的 GPU 可用於推理。

首先使用文字編碼器計算文字嵌入。透過設定 device_map="balanced" 將文字編碼器保持在兩個 GPU 上。balanced 策略將模型均勻分佈在所有可用的 GPU 上。使用 max_memory 引數為每個 GPU 上的每個文字編碼器分配最大記憶體量。

在此步驟中載入文字編碼器!擴散 Transformer 和 VAE 將在稍後的步驟中載入,以保留記憶體。

from diffusers import FluxPipeline
import torch

prompt = "a photo of a dog with cat-like look"

pipeline = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev",
    transformer=None,
    vae=None,
    device_map="balanced",
    max_memory={0: "16GB", 1: "16GB"},
    torch_dtype=torch.bfloat16
)
with torch.no_grad():
    print("Encoding prompts.")
    prompt_embeds, pooled_prompt_embeds, text_ids = pipeline.encode_prompt(
        prompt=prompt, prompt_2=None, max_sequence_length=512
    )

計算文字嵌入後,將其從 GPU 中移除,為擴散 Transformer 騰出空間。

import gc 

def flush():
    gc.collect()
    torch.cuda.empty_cache()
    torch.cuda.reset_max_memory_allocated()
    torch.cuda.reset_peak_memory_stats()

del pipeline.text_encoder
del pipeline.text_encoder_2
del pipeline.tokenizer
del pipeline.tokenizer_2
del pipeline

flush()

接下來載入具有12.5B引數的擴散Transformer。這次,設定device_map="auto"以自動將模型分發到兩個16GB GPU上。auto策略由Accelerate支援,並作為大模型推理功能的一部分提供。它首先將模型分發到最快的裝置(GPU),然後在需要時移動到較慢的裝置,如CPU和硬碟。將模型引數儲存在較慢的裝置上的缺點是推理延遲較慢。

from diffusers import AutoModel
import torch 

transformer = AutoModel.from_pretrained(
    "black-forest-labs/FLUX.1-dev", 
    subfolder="transformer",
    device_map="auto",
    torch_dtype=torch.bfloat16
)

在任何時候,您都可以嘗試 print(pipeline.hf_device_map) 來檢視各個模型是如何分佈在裝置上的。這對於跟蹤模型的裝置放置很有用。您還可以嘗試 print(transformer.hf_device_map) 來檢視 transformer 模型是如何在裝置上分片的。

將 Transformer 模型新增到管道中進行去噪,但將其他模型級元件(如文字編碼器和 VAE)設定為 None,因為暫時不需要它們。

pipeline = FluxPipeline.from_pretrained(
    "black-forest-labs/FLUX.1-dev",
    text_encoder=None,
    text_encoder_2=None,
    tokenizer=None,
    tokenizer_2=None,
    vae=None,
    transformer=transformer,
    torch_dtype=torch.bfloat16
)

print("Running denoising.")
height, width = 768, 1360
latents = pipeline(
    prompt_embeds=prompt_embeds,
    pooled_prompt_embeds=pooled_prompt_embeds,
    num_inference_steps=50,
    guidance_scale=3.5,
    height=height,
    width=width,
    output_type="latent",
).images

將管道和 transformer 從記憶體中移除,因為它們不再需要。

del pipeline.transformer
del pipeline

flush()

最後,用 VAE 將潛在變數解碼為影像。VAE 通常足夠小,可以載入到單個 GPU 上。

from diffusers import AutoencoderKL
from diffusers.image_processor import VaeImageProcessor
import torch 

vae = AutoencoderKL.from_pretrained(ckpt_id, subfolder="vae", torch_dtype=torch.bfloat16).to("cuda")
vae_scale_factor = 2 ** (len(vae.config.block_out_channels))
image_processor = VaeImageProcessor(vae_scale_factor=vae_scale_factor)

with torch.no_grad():
    print("Running decoding.")
    latents = FluxPipeline._unpack_latents(latents, height, width, vae_scale_factor)
    latents = (latents / vae.config.scaling_factor) + vae.config.shift_factor

    image = vae.decode(latents, return_dict=False)[0]
    image = image_processor.postprocess(image, output_type="pil")
    image[0].save("split_transformer.png")

透過選擇性地載入和解除安裝給定階段所需的模型,並將最大模型分片到多個 GPU 上,可以在消費級 GPU 上執行大型模型推理。

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.