Diffusers 文件

訓練擴散模型

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

訓練擴散模型

無條件影像生成是擴散模型的一個流行應用,它能生成與訓練資料集中的影像相似的影像。通常,最好的結果是透過在特定資料集上微調預訓練模型來獲得的。你可以在 Hub 上找到許多這樣的檢查點,但如果你找不到你喜歡的,你隨時可以訓練自己的模型!

本教程將教你如何從頭開始在 史密森尼蝴蝶 資料集的一個子集上訓練一個 UNet2DModel,以生成你自己的 🦋 蝴蝶 🦋。

💡 這篇訓練教程基於 使用 🧨 Diffusers 進行訓練 的 notebook。有關擴散模型的更多細節和背景知識,比如它們如何工作,請查閱該 notebook!

在開始之前,請確保你已經安裝了 🤗 Datasets 來載入和預處理影像資料集,以及 🤗 Accelerate,以簡化在任意數量的 GPU 上的訓練。以下命令還將安裝 TensorBoard 來視覺化訓練指標(你也可以使用 Weights & Biases 來跟蹤你的訓練)。

# uncomment to install the necessary libraries in Colab
#!pip install diffusers[training]

我們鼓勵你與社群分享你的模型,為了做到這一點,你需要登入你的 Hugging Face 賬戶(如果你還沒有,可以在這裡建立一個!)。你可以在 notebook 中登入,並在提示時輸入你的令牌。請確保你的令牌具有寫入許可權。

>>> from huggingface_hub import notebook_login

>>> notebook_login()

或者從終端登入

huggingface-cli login

由於模型檢查點相當大,請安裝 Git-LFS 來對這些大檔案進行版本控制

!sudo apt -qq install git-lfs
!git config --global credential.helper store

訓練配置

為方便起見,建立一個包含訓練超引數的 TrainingConfig 類(可以隨時調整它們)

>>> from dataclasses import dataclass

>>> @dataclass
... class TrainingConfig:
...     image_size = 128  # the generated image resolution
...     train_batch_size = 16
...     eval_batch_size = 16  # how many images to sample during evaluation
...     num_epochs = 50
...     gradient_accumulation_steps = 1
...     learning_rate = 1e-4
...     lr_warmup_steps = 500
...     save_image_epochs = 10
...     save_model_epochs = 30
...     mixed_precision = "fp16"  # `no` for float32, `fp16` for automatic mixed precision
...     output_dir = "ddpm-butterflies-128"  # the model name locally and on the HF Hub

...     push_to_hub = True  # whether to upload the saved model to the HF Hub
...     hub_model_id = "<your-username>/<my-awesome-model>"  # the name of the repository to create on the HF Hub
...     hub_private_repo = None
...     overwrite_output_dir = True  # overwrite the old model when re-running the notebook
...     seed = 0


>>> config = TrainingConfig()

載入資料集

你可以使用 🤗 Datasets 庫輕鬆載入 史密森尼蝴蝶 資料集

>>> from datasets import load_dataset

>>> config.dataset_name = "huggan/smithsonian_butterflies_subset"
>>> dataset = load_dataset(config.dataset_name, split="train")

💡 你可以從 HugGan 社群活動 中找到更多資料集,或者透過建立一個本地 ImageFolder 來使用你自己的資料集。如果資料集來自 HugGan 社群活動,請將 config.dataset_name 設定為資料集的倉庫 ID,如果使用你自己的影像,則設定為 imagefolder

🤗 Datasets 使用 Image 特性自動解碼影像資料並將其載入為 PIL.Image,我們可以對其進行視覺化

>>> import matplotlib.pyplot as plt

>>> fig, axs = plt.subplots(1, 4, figsize=(16, 4))
>>> for i, image in enumerate(dataset[:4]["image"]):
...     axs[i].imshow(image)
...     axs[i].set_axis_off()
>>> fig.show()

然而,這些影像的尺寸各不相同,所以你需要先對它們進行預處理

  • Resize 將影像大小更改為在 config.image_size 中定義的大小。
  • RandomHorizontalFlip 透過隨機映象影像來增強資料集。
  • Normalize 很重要,它將畫素值重新縮放到 [-1, 1] 的範圍內,這是模型所期望的。
>>> from torchvision import transforms

>>> preprocess = transforms.Compose(
...     [
...         transforms.Resize((config.image_size, config.image_size)),
...         transforms.RandomHorizontalFlip(),
...         transforms.ToTensor(),
...         transforms.Normalize([0.5], [0.5]),
...     ]
... )

使用 🤗 Datasets 的 set_transform 方法在訓練期間動態應用 preprocess 函式

>>> def transform(examples):
...     images = [preprocess(image.convert("RGB")) for image in examples["image"]]
...     return {"images": images}


>>> dataset.set_transform(transform)

可以再次視覺化影像,以確認它們已被調整大小。現在你準備好將資料集包裝在 DataLoader 中進行訓練了!

>>> import torch

>>> train_dataloader = torch.utils.data.DataLoader(dataset, batch_size=config.train_batch_size, shuffle=True)

建立 UNet2DModel

🧨 Diffusers 中的預訓練模型可以輕鬆地從其模型類和你想要的引數建立。例如,要建立一個 UNet2DModel

>>> from diffusers import UNet2DModel

>>> model = UNet2DModel(
...     sample_size=config.image_size,  # the target image resolution
...     in_channels=3,  # the number of input channels, 3 for RGB images
...     out_channels=3,  # the number of output channels
...     layers_per_block=2,  # how many ResNet layers to use per UNet block
...     block_out_channels=(128, 128, 256, 256, 512, 512),  # the number of output channels for each UNet block
...     down_block_types=(
...         "DownBlock2D",  # a regular ResNet downsampling block
...         "DownBlock2D",
...         "DownBlock2D",
...         "DownBlock2D",
...         "AttnDownBlock2D",  # a ResNet downsampling block with spatial self-attention
...         "DownBlock2D",
...     ),
...     up_block_types=(
...         "UpBlock2D",  # a regular ResNet upsampling block
...         "AttnUpBlock2D",  # a ResNet upsampling block with spatial self-attention
...         "UpBlock2D",
...         "UpBlock2D",
...         "UpBlock2D",
...         "UpBlock2D",
...     ),
... )

快速檢查樣本影像形狀是否與模型輸出形狀匹配通常是一個好主意

>>> sample_image = dataset[0]["images"].unsqueeze(0)
>>> print("Input shape:", sample_image.shape)
Input shape: torch.Size([1, 3, 128, 128])

>>> print("Output shape:", model(sample_image, timestep=0).sample.shape)
Output shape: torch.Size([1, 3, 128, 128])

太棒了!接下來,你需要一個排程器來為影像新增一些噪聲。

建立排程器

排程器的行為取決於你是在訓練還是在推理時使用模型。在推理期間,排程器從噪聲生成影像。在訓練期間,排程器從擴散過程中的特定點獲取模型輸出(或樣本),並根據*噪聲排程*和*更新規則*向影像應用噪聲。

讓我們看一下 DDPMScheduler,並使用 add_noise 方法為之前的 sample_image 新增一些隨機噪聲

>>> import torch
>>> from PIL import Image
>>> from diffusers import DDPMScheduler

>>> noise_scheduler = DDPMScheduler(num_train_timesteps=1000)
>>> noise = torch.randn(sample_image.shape)
>>> timesteps = torch.LongTensor([50])
>>> noisy_image = noise_scheduler.add_noise(sample_image, noise, timesteps)

>>> Image.fromarray(((noisy_image.permute(0, 2, 3, 1) + 1.0) * 127.5).type(torch.uint8).numpy()[0])

模型的訓練目標是預測新增到影像中的噪聲。此步驟的損失可以透過以下方式計算

>>> import torch.nn.functional as F

>>> noise_pred = model(noisy_image, timesteps).sample
>>> loss = F.mse_loss(noise_pred, noise)

訓練模型

到目前為止,你已經擁有了開始訓練模型所需的大部分元件,剩下的就是把所有東西組合起來。

首先,你需要一個最佳化器和一個學習率排程器

>>> from diffusers.optimization import get_cosine_schedule_with_warmup

>>> optimizer = torch.optim.AdamW(model.parameters(), lr=config.learning_rate)
>>> lr_scheduler = get_cosine_schedule_with_warmup(
...     optimizer=optimizer,
...     num_warmup_steps=config.lr_warmup_steps,
...     num_training_steps=(len(train_dataloader) * config.num_epochs),
... )

然後,你需要一種評估模型的方法。為了評估,你可以使用 DDPMPipeline 生成一批樣本影像,並將其儲存為網格

>>> from diffusers import DDPMPipeline
>>> from diffusers.utils import make_image_grid
>>> import os

>>> def evaluate(config, epoch, pipeline):
...     # Sample some images from random noise (this is the backward diffusion process).
...     # The default pipeline output type is `List[PIL.Image]`
...     images = pipeline(
...         batch_size=config.eval_batch_size,
...         generator=torch.Generator(device='cpu').manual_seed(config.seed), # Use a separate torch generator to avoid rewinding the random state of the main training loop
...     ).images

...     # Make a grid out of the images
...     image_grid = make_image_grid(images, rows=4, cols=4)

...     # Save the images
...     test_dir = os.path.join(config.output_dir, "samples")
...     os.makedirs(test_dir, exist_ok=True)
...     image_grid.save(f"{test_dir}/{epoch:04d}.png")

現在,你可以將所有這些元件包裝在一個訓練迴圈中,並使用 🤗 Accelerate 進行簡單的 TensorBoard 日誌記錄、梯度累積和混合精度訓練。要將模型上傳到 Hub,編寫一個函式來獲取你的倉庫名稱和資訊,然後將其推送到 Hub。

💡 下面的訓練迴圈可能看起來令人生畏且冗長,但稍後當你只需一行程式碼就能啟動訓練時,你會覺得這一切都是值得的!如果你等不及想開始生成影像,可以隨時複製並執行下面的程式碼。你可以隨時回來更仔細地研究訓練迴圈,比如在你等待模型完成訓練時。🤗

>>> from accelerate import Accelerator
>>> from huggingface_hub import create_repo, upload_folder
>>> from tqdm.auto import tqdm
>>> from pathlib import Path
>>> import os

>>> def train_loop(config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler):
...     # Initialize accelerator and tensorboard logging
...     accelerator = Accelerator(
...         mixed_precision=config.mixed_precision,
...         gradient_accumulation_steps=config.gradient_accumulation_steps,
...         log_with="tensorboard",
...         project_dir=os.path.join(config.output_dir, "logs"),
...     )
...     if accelerator.is_main_process:
...         if config.output_dir is not None:
...             os.makedirs(config.output_dir, exist_ok=True)
...         if config.push_to_hub:
...             repo_id = create_repo(
...                 repo_id=config.hub_model_id or Path(config.output_dir).name, exist_ok=True
...             ).repo_id
...         accelerator.init_trackers("train_example")

...     # Prepare everything
...     # There is no specific order to remember, you just need to unpack the
...     # objects in the same order you gave them to the prepare method.
...     model, optimizer, train_dataloader, lr_scheduler = accelerator.prepare(
...         model, optimizer, train_dataloader, lr_scheduler
...     )

...     global_step = 0

...     # Now you train the model
...     for epoch in range(config.num_epochs):
...         progress_bar = tqdm(total=len(train_dataloader), disable=not accelerator.is_local_main_process)
...         progress_bar.set_description(f"Epoch {epoch}")

...         for step, batch in enumerate(train_dataloader):
...             clean_images = batch["images"]
...             # Sample noise to add to the images
...             noise = torch.randn(clean_images.shape, device=clean_images.device)
...             bs = clean_images.shape[0]

...             # Sample a random timestep for each image
...             timesteps = torch.randint(
...                 0, noise_scheduler.config.num_train_timesteps, (bs,), device=clean_images.device,
...                 dtype=torch.int64
...             )

...             # Add noise to the clean images according to the noise magnitude at each timestep
...             # (this is the forward diffusion process)
...             noisy_images = noise_scheduler.add_noise(clean_images, noise, timesteps)

...             with accelerator.accumulate(model):
...                 # Predict the noise residual
...                 noise_pred = model(noisy_images, timesteps, return_dict=False)[0]
...                 loss = F.mse_loss(noise_pred, noise)
...                 accelerator.backward(loss)

...                 if accelerator.sync_gradients:
...                     accelerator.clip_grad_norm_(model.parameters(), 1.0)
...                 optimizer.step()
...                 lr_scheduler.step()
...                 optimizer.zero_grad()

...             progress_bar.update(1)
...             logs = {"loss": loss.detach().item(), "lr": lr_scheduler.get_last_lr()[0], "step": global_step}
...             progress_bar.set_postfix(**logs)
...             accelerator.log(logs, step=global_step)
...             global_step += 1

...         # After each epoch you optionally sample some demo images with evaluate() and save the model
...         if accelerator.is_main_process:
...             pipeline = DDPMPipeline(unet=accelerator.unwrap_model(model), scheduler=noise_scheduler)

...             if (epoch + 1) % config.save_image_epochs == 0 or epoch == config.num_epochs - 1:
...                 evaluate(config, epoch, pipeline)

...             if (epoch + 1) % config.save_model_epochs == 0 or epoch == config.num_epochs - 1:
...                 if config.push_to_hub:
...                     upload_folder(
...                         repo_id=repo_id,
...                         folder_path=config.output_dir,
...                         commit_message=f"Epoch {epoch}",
...                         ignore_patterns=["step_*", "epoch_*"],
...                     )
...                 else:
...                     pipeline.save_pretrained(config.output_dir)

呼,程式碼量還真不小!但你終於準備好使用 🤗 Accelerate 的 notebook_launcher 函式來啟動訓練了。將訓練迴圈、所有訓練引數以及用於訓練的程序數(你可以將此值更改為你可用的 GPU 數量)傳遞給該函式

>>> from accelerate import notebook_launcher

>>> args = (config, model, noise_scheduler, optimizer, train_dataloader, lr_scheduler)

>>> notebook_launcher(train_loop, args, num_processes=1)

訓練完成後,來看看你的擴散模型生成的最終 🦋 影像 🦋 吧!

>>> import glob

>>> sample_images = sorted(glob.glob(f"{config.output_dir}/samples/*.png"))
>>> Image.open(sample_images[-1])

下一步

無條件影像生成是可以訓練的任務的一個例子。你可以透過訪問 🧨 Diffusers 訓練示例 頁面來探索其他任務和訓練技術。以下是一些你可以學習的例子

  • 文字反演(Textual Inversion),一種教模型特定視覺概念並將其整合到生成影像中的演算法。
  • DreamBooth,一種給定主體幾張輸入影像,生成該主體個性化影像的技術。
  • 指南,用於在你自己的資料集上微調 Stable Diffusion 模型。
  • 指南,用於使用 LoRA,一種記憶體高效的技術,用於更快地微調非常大的模型。
< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.