🏎️ 使用 Hugging Face Kernel Hub 在 5 分鐘內提升模型效能

釋出於 2025 年 6 月 12 日
在 GitHub 上更新

使用預最佳化核心提升模型效能,可輕鬆從 Hub 載入。

今天,我們將探索 Hugging Face 的一項激動人心的新進展:**Kernel Hub**!作為機器學習從業者,我們知道要最大化效能,通常需要深入研究最佳化程式碼、自定義 CUDA 核心或複雜的構建系統。Kernel Hub 極大地簡化了這一過程!

下面是如何在程式碼中使用核心的簡短示例。

import torch

from kernels import get_kernel

# Download optimized kernels from the Hugging Face hub
activation = get_kernel("kernels-community/activation")

# Random tensor
x = torch.randn((10, 10), dtype=torch.float16, device="cuda")

# Run the kernel
y = torch.empty_like(x)
activation.gelu_fast(y, x)

print(y)

在接下來的章節中,我們將涵蓋以下主題:

  1. **Kernel Hub 是什麼?**——理解核心概念。
  2. **如何使用 Kernel Hub**——一個快速程式碼示例。
  3. **為簡單模型新增 Kernel**——使用 RMSNorm 的實際整合。
  4. **審查效能影響**——對 RMSNorm 差異進行基準測試。
  5. **真實世界用例**——核心庫在其他專案中的使用示例。

我們將快速介紹這些概念——核心思想可以在大約 5 分鐘內掌握(儘管實驗和基準測試可能需要更長時間!)。

1. Kernel Hub 是什麼?

Kernel Hub(👈 檢視它!)允許 Python 庫和應用程式**直接從 Hugging Face Hub 載入最佳化的計算核心**。可以把它想象成模型中心,但是是用於加速特定操作(通常在 GPU 上)的低階、高效能程式碼片段(核心)。

示例包括高階注意力機制(如 FlashAttention,可顯著提高速度並節省記憶體)。自定義量化核心(實現使用 INT8 或 INT4 等低精度資料型別的有效計算)。複雜架構(如 Mixture of Experts (MoE) 層)所需的專用核心,這些層涉及複雜的路由和計算模式。以及 啟用函式歸一化層(如 LayerNorm 或 RMSNorm)

您無需手動管理複雜的依賴項,無需處理編譯標誌,也無需從原始碼構建 Triton 或 CUTLASS 等庫,您可以使用 `kernels` 庫立即獲取並執行預編譯的最佳化核心。

例如,要啟用 **FlashAttention**,您只需一行程式碼——無需構建,無需標誌

from kernels import get_kernel

flash_attention = get_kernel("kernels-community/flash-attn")

`kernels` 會檢測您精確的 Python、PyTorch 和 CUDA 版本,然後下載匹配的預編譯二進位制檔案——通常只需幾秒鐘(在連線較慢的情況下可能需要一兩分鐘)。

相比之下,自行編譯 FlashAttention 需要:

  • 克隆倉庫並安裝所有依賴項。
  • 配置構建標誌和環境變數。
  • 預留 **~96 GB 記憶體** 和大量 CPU 核心。
  • 等待 **10 分鐘到數小時**,具體取決於您的硬體。(有關詳細資訊,請參閱該專案自己的安裝指南。)

Kernel Hub 消除了所有這些麻煩:一個函式呼叫,即刻加速。

Kernel Hub 的優勢:

  • **即時訪問最佳化核心**:載入並運行針對各種硬體(從 NVIDIA 和 AMD GPU 開始)最佳化的核心,無需本地編譯的麻煩。
  • **共享和重用**:在不同專案和社群中發現、共享和重用核心。
  • **輕鬆更新**:只需從 Hub 拉取最新版本,即可隨時獲取最新的核心改進。
  • **加速開發**:專注於模型架構和邏輯,而不是核心編譯和部署的複雜性。
  • **提高效能**:利用專家最佳化的核心,有可能加速訓練和推理。
  • **簡化部署**:透過按需獲取核心,降低部署環境的複雜性。
  • **開發和分享您自己的核心**:如果您建立了最佳化的核心,可以輕鬆將其分享到 Hub 上供他人使用。這鼓勵了社群內的協作和知識共享。

正如許多機器學習開發者所知,管理依賴項和從原始碼構建低階程式碼可能是一個耗時且容易出錯的過程。Kernel Hub 旨在透過提供一個集中的最佳化計算核心倉庫來簡化這一點,這些核心可以輕鬆載入和執行。

將更多時間花在構建出色的模型上,減少與構建系統鬥爭的時間!

2. 如何使用 Kernel Hub(基本示例)

使用 Kernel Hub 的設計旨在直接了當。`kernels` 庫提供了主要的介面。這是一個載入最佳化 GELU 啟用函式核心的快速示例。(稍後,我們將看到另一個關於如何將核心整合到模型中的示例)。

檔案:`activation_validation_example.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///

import torch
import torch.nn.functional as F
from kernels import get_kernel

DEVICE = "cuda"

# Make reproducible
torch.manual_seed(42)

# Download optimized activation kernels from the Hub
activation_kernels = get_kernel("kernels-community/activation")

# Create a random tensor on the GPU
x = torch.randn((4, 4), dtype=torch.float16, device=DEVICE)

# Prepare an output tensor
y = torch.empty_like(x)

# Run the fast GELU kernel
activation_kernels.gelu_fast(y, x)

# Get expected output using PyTorch's built-in GELU
expected = F.gelu(x)

# Compare the kernel output with PyTorch's result
torch.testing.assert_close(y, expected, rtol=1e-2, atol=1e-2)

print("✅ Kernel output matches PyTorch GELU!")

# Optional: print both tensors for inspection
print("\nInput tensor:")
print(x)
print("\nFast GELU kernel output:")
print(y)
print("\nPyTorch GELU output:")
print(expected)

# List available functions in the loaded kernel module
print("\nAvailable functions in 'kernels-community/activation':")
print(dir(activation_kernels))

**(注意:** 如果您安裝了 `uv`,您可以將此指令碼儲存為 `script.py` 並執行 `uv run script.py`,它將自動處理依賴項。)

這裡發生了什麼?

  1. **匯入 `get_kernel`**:此函式是透過 `kernels` 庫訪問 Kernel Hub 的入口點。
  2. **`get_kernel("kernels-community/activation")`**:這行程式碼在 `kernels-community` 組織下尋找 `activation` 核心倉庫。它會下載、快取並載入相應的預編譯核心二進位制檔案。
  3. **準備張量**:我們在 GPU 上建立輸入 (`x`) 和輸出 (`y`) 張量。
  4. **`activation_kernels.gelu_fast(y, x)`**:我們呼叫載入的核心模組提供的特定最佳化函式 (`gelu_fast`)。
  5. **驗證**:我們檢查輸出。

這個簡單的示例展示了您如何輕鬆地獲取和執行高度最佳化的程式碼。現在,讓我們看看使用 RMS 歸一化的更實際的整合。

3. 為簡單模型新增 Kernel

讓我們將一個最佳化的 **RMS 歸一化**核心整合到一個基本模型中。我們將使用 `kernels-community/triton-layer-norm` 倉庫中提供的 `LlamaRMSNorm` 實現(注意:此倉庫包含各種歸一化核心),並將其與基線 PyTorch RMSNorm 實現進行比較。

首先,在 PyTorch 中定義一個簡單的 RMSNorm 模組和使用它的基線模型

檔案:`rmsnorm_baseline.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///
import torch
import torch.nn as nn

DEVICE = "cuda"

DTYPE = torch.float16  # Use float16 for better kernel performance potential


# Simple PyTorch implementation of RMSNorm for baseline comparison
class RMSNorm(nn.Module):
    def __init__(self, hidden_size, variance_epsilon=1e-5):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.eps = variance_epsilon
        self.hidden_size = hidden_size

    def forward(self, x):
        # Assumes x is (batch_size, ..., hidden_size)
        input_dtype = x.dtype
        # Calculate variance in float32 for stability
        variance = x.to(torch.float32).pow(2).mean(-1, keepdim=True)
        x = x * torch.rsqrt(variance + self.eps)

        # Apply weight and convert back to original dtype
        return (self.weight * x).to(input_dtype)


class BaselineModel(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, eps=1e-5):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        self.norm = RMSNorm(hidden_size, variance_epsilon=eps)
        self.activation = nn.GELU()
        self.linear2 = nn.Linear(hidden_size, output_size)

        # ensure all linear layers weights are 1 for testing
        with torch.no_grad():
            self.linear1.weight.fill_(1)
            self.linear1.bias.fill_(0)
            self.linear2.weight.fill_(1)
            self.linear2.bias.fill_(0)
            self.norm.weight.fill_(1)

    def forward(self, x):
        x = self.linear1(x)
        x = self.norm(x)  # Apply RMSNorm
        x = self.activation(x)
        x = self.linear2(x)
        return x


# Example usage
input_size = 128
hidden_size = 256
output_size = 10
eps_val = 1e-5

baseline_model = (
    BaselineModel(input_size, hidden_size, output_size, eps=eps_val)
    .to(DEVICE)
    .to(DTYPE)
)
dummy_input = torch.randn(32, input_size, device=DEVICE, dtype=DTYPE)  # Batch of 32
output = baseline_model(dummy_input)
print("Baseline RMSNorm model output shape:", output.shape)

現在,讓我們建立一個使用透過 `kernels` 載入的 `LlamaRMSNorm` 核心的版本。

檔案:`rmsnorm_kernel.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///
import torch
import torch.nn as nn
from kernels import get_kernel, use_kernel_forward_from_hub

# reuse the model from the previous snippet or copy the class
# definition here to run this script independently
from rmsnorm_baseline import BaselineModel

DEVICE = "cuda"
DTYPE = torch.float16  # Use float16 for better kernel performance potential


layer_norm_kernel_module = get_kernel("kernels-community/triton-layer-norm")

# Simply add the decorator to the LlamaRMSNorm class to automatically replace the forward function
# with the optimized kernel version
# 
# Note: not all kernels ship with layers already mapped, and would require calling the function directly
# However in this case, the LlamaRMSNorm class is already mapped to the kernel function. Otherwise we'd need to
# call the function directly like this:
# ```python
# layer_norm_kernel_module.rms_norm_fn(
#     hidden_states,
#     self.weight,
#     bias=None,
#     residual=None,
#     eps=self.variance_epsilon,
#     dropout_p=0.0,
#     prenorm=False,
#     residual_in_fp32=False,
# )
# ```
@use_kernel_forward_from_hub("LlamaRMSNorm")
class OriginalRMSNorm(nn.Module):
    def __init__(self, hidden_size, variance_epsilon=1e-5):
        super().__init__()
        self.weight = nn.Parameter(torch.ones(hidden_size))
        self.eps = variance_epsilon
        self.hidden_size = hidden_size

    def forward(self, x):
        # Assumes x is (batch_size, ..., hidden_size)
        input_dtype = x.dtype
        # Calculate variance in float32 for stability
        variance = x.to(torch.float32).pow(2).mean(-1, keepdim=True)
        x = x * torch.rsqrt(variance + self.eps)

        # Apply weight and convert back to original dtype
        return (self.weight * x).to(input_dtype)


class KernelModel(nn.Module):
    def __init__(
        self,
        input_size,
        hidden_size,
        output_size,
        device="cuda",
        dtype=torch.float16,
        eps=1e-5,
    ):
        super().__init__()
        self.linear1 = nn.Linear(input_size, hidden_size)
        # OriginalRMSNorm will be replaced with the optimized kernel layer
        # when the model is loaded
        self.norm = OriginalRMSNorm(hidden_size, variance_epsilon=eps)
        self.activation = nn.GELU()
        self.linear2 = nn.Linear(hidden_size, output_size)

        # ensure all linear layers weights are 1 for testing
        with torch.no_grad():
            self.linear1.weight.fill_(1)
            self.linear1.bias.fill_(0)
            self.linear2.weight.fill_(1)
            self.linear2.bias.fill_(0)
            self.norm.weight.fill_(1)

    def forward(self, x):
        x = self.linear1(x)
        x = self.norm(x)
        x = self.activation(x)
        x = self.linear2(x)
        return x


# Example usage
input_size = 128
hidden_size = 256
output_size = 10
eps_val = 1e-5

kernel_model = (
    KernelModel(
        input_size, hidden_size, output_size, device=DEVICE, dtype=DTYPE, eps=eps_val
    )
    .to(DEVICE)
    .to(DTYPE)
)

baseline_model = (
    BaselineModel(input_size, hidden_size, output_size, eps=eps_val)
    .to(DEVICE)
    .to(DTYPE)
)

dummy_input = torch.randn(32, input_size, device=DEVICE, dtype=DTYPE)  # Batch of 32

output = baseline_model(dummy_input)
output_kernel = kernel_model(dummy_input)
print("Kernel RMSNorm model output shape:", output_kernel.shape)

# Verify outputs are close (RMSNorm implementations should be numerically close)
try:
    torch.testing.assert_close(output, output_kernel, rtol=1e-2, atol=1e-2)
    print("\nBaseline and Kernel RMSNorm model outputs match!")
except AssertionError as e:
    print("\nBaseline and Kernel RMSNorm model outputs differ slightly:")
    print(e)
except NameError:
    print("\nSkipping output comparison as kernel model output was not generated.")

關於 `KernelModel` 的重要注意事項

  • **核心繼承:** `KernelRMSNorm` 類繼承自核心中的 RMSNorm 實現 `layer_norm_kernel_module.layers.LlamaRMSNorm`。這允許我們直接使用最佳化過的核心。
  • **訪問函式:** 訪問 RMSNorm 函式的確切方式(`layer_norm_kernel_module.layers.LlamaRMSNorm.forward`、`layer_norm_kernel_module.rms_norm_forward` 或其他)**完全取決於核心建立者在 Hub 上組織倉庫的方式。** 您可能需要檢查載入的 `layer_norm_kernel_module` 物件(例如,使用 `dir()`)或檢視 Hub 上核心的文件以找到正確的函式/方法及其簽名。我已使用 `rms_norm_forward` 作為合理的佔位符並添加了錯誤處理。
  • **引數:** 現在我們只定義 `rms_norm_weight`(無偏置),這與 RMSNorm 一致。

4. 基準測試效能影響

與標準 PyTorch 版本相比,最佳化的 Triton RMSNorm 核心速度快了多少?讓我們對前向傳播進行基準測試,找出答案。

檔案:`rmsnorm_benchmark.py`

# /// script
# dependencies = [
#  "numpy",
#  "torch",
#  "kernels",
# ]
# ///
import torch

# reuse the models from the previous snippets or copy the class
# definitions here to run this script independently
from rmsnorm_baseline import BaselineModel
from rmsnorm_kernel import KernelModel

DEVICE = "cuda"
DTYPE = torch.float16  # Use float16 for better kernel performance potential


# Use torch.cuda.Event for accurate GPU timing (ensure function is defined)
def benchmark_model(model, input_tensor, num_runs=100, warmup_runs=10):
    model.eval()  # Set model to evaluation mode
    dtype = input_tensor.dtype
    model = model.to(input_tensor.device).to(dtype)

    # Warmup runs
    for _ in range(warmup_runs):
        _ = model(input_tensor)
    torch.cuda.synchronize()

    # Timed runs
    start_event = torch.cuda.Event(enable_timing=True)
    end_event = torch.cuda.Event(enable_timing=True)
    start_event.record()
    for _ in range(num_runs):
        _ = model(input_tensor)
    end_event.record()
    torch.cuda.synchronize()
    elapsed_time_ms = start_event.elapsed_time(end_event)
    avg_time_ms = elapsed_time_ms / num_runs
    return avg_time_ms


input_size_bench = 4096
hidden_size_bench = 4096  # RMSNorm performance is sensitive to this dimension
output_size_bench = 10
eps_val_bench = 1e-5

# Create larger models and input for benchmark
# Ensure both models are fully converted to the target DEVICE and DTYPE
baseline_model_bench = (
    BaselineModel(
        input_size_bench, hidden_size_bench, output_size_bench, eps=eps_val_bench
    )
    .to(DEVICE)
    .to(DTYPE)
)
kernel_model_bench = (
    KernelModel(
        input_size_bench,
        hidden_size_bench,
        output_size_bench,
        device=DEVICE,
        dtype=DTYPE,
        eps=eps_val_bench,
    )
    .to(DEVICE)
    .to(DTYPE)
)

# call both with larger batch sizes to warm up the GPU
# and ensure the models are loaded
warmup_input = torch.randn(4096, input_size_bench, device=DEVICE, dtype=DTYPE)
_ = kernel_model_bench(warmup_input)
_ = baseline_model_bench(warmup_input)

batch_sizes = [
    256,
    512,
    1024,
    2048,
    4096,
    8192,
    16384,
    32768,
]

print(
    f"{'Batch Size':<12} | {'Baseline Time (ms)':<18} | {'Kernel Time (ms)':<18} | {'Speedup'}"
)
print("-" * 74)

for batch_size in batch_sizes:
    # Call cuda synchronize to ensure all previous GPU operations are complete
    torch.cuda.synchronize()

    # Create random input tensor
    # Ensure the input tensor is on the correct device and dtype
    bench_input = torch.randn(batch_size, input_size_bench, device=DEVICE, dtype=DTYPE)

    # Run benchmarks only if kernel was loaded successfully
    baseline_time = benchmark_model(baseline_model_bench, bench_input)

    kernel_time = -1  # Sentinel value

    kernel_time = benchmark_model(kernel_model_bench, bench_input)

    baseline_time = round(baseline_time, 4)
    kernel_time = round(kernel_time, 4)
    speedup = round(baseline_time / kernel_time, 2) if kernel_time > 0 else "N/A"
    if kernel_time < baseline_time:
        speedup = f"{speedup:.2f}x"
    elif kernel_time == baseline_time:
        speedup = "1.00x (identical)"
    else:
        speedup = f"{kernel_time / baseline_time:.2f}x slower"
    print(f"{batch_size:<12} | {baseline_time:<18} | {kernel_time:<18} | {speedup}")

**預期結果:** 與 LayerNorm 一樣,使用 Triton 的良好調優 RMSNorm 實現可以比 PyTorch 的預設版本帶來顯著的加速——特別是對於相容硬體(例如,NVIDIA Ampere 或 Hopper GPU)上的記憶體密集型工作負載和低精度型別(如 `float16` 或 `bfloat16`)。

請記住

  • 結果可能因您的 GPU、輸入大小和資料型別而異。
  • 微基準測試可能無法真實反映實際效能。
  • 效能取決於核心實現的質量。
  • 由於開銷,最佳化核心可能對小批次大小無益。

實際結果將取決於您的硬體和具體的核心實現。以下是您可能看到的一個示例(在 L4 GPU 上)

批次大小 基線時間 (ms) 核心時間 (ms) 加速比
256 0.2122 0.2911 0.72x
512 0.4748 0.3312 1.43x
1024 0.8946 0.6864 1.30x
2048 2.0289 1.3889 1.46x
4096 4.4318 2.2467 1.97x
8192 9.2438 4.8497 1.91x
16384 18.6992 9.8805 1.89x
32768 37.079 19.9461 1.86x
65536 73.588 39.593 1.86x

5. 真實世界用例

`kernels` 庫仍在發展中,但已在各種實際專案中得到應用,包括:

  • Text Generation Inference:TGI 專案使用 `kernels` 庫載入用於文字生成任務的最佳化核心,從而提高效能和效率。
  • Transformers:Transformers 庫已集成了 `kernels` 庫,無需對模型程式碼進行任何更改即可使用最佳化層。這允許使用者在標準實現和最佳化實現之間輕鬆切換。

開始和下一步!

您已經瞭解了使用 Hugging Face Kernel Hub 獲取和使用最佳化核心是多麼容易。準備好自己嘗試了嗎?

  1. 安裝庫

    pip install kernels torch numpy
    

    確保您已安裝相容的 PyTorch 版本和 GPU 驅動程式。

  2. **瀏覽 Hub:** 在 Hugging Face Hub 上,在 `kernels` 標籤下或在 `kernels-community` 等組織中探索可用的核心。尋找與您的操作相關的核心(啟用函式、注意力機制、LayerNorm/RMSNorm 等歸一化)。

  3. **實驗:** 嘗試替換您自己模型中的元件。使用 `get_kernel("user-or-org/kernel-name")`。 **至關重要的是,檢查載入的核心物件**(例如,使用 `print(dir(loaded_kernel))`)或檢視其 Hub 倉庫文件,以瞭解如何正確呼叫其函式/方法以及它期望哪些引數(權重、偏置、輸入、epsilon)。

  4. **基準測試:** 衡量其對您特定硬體和工作負載的效能影響。不要忘記檢查數值正確性 (`torch.testing.assert_close`)。

  5. **(高階)貢獻:** 如果您開發了最佳化的核心,請考慮在 Hub 上分享它們!

結論

Hugging Face Kernel Hub 提供了一種強大而簡單的方式來訪問和利用最佳化的計算核心。透過將標準 PyTorch 元件替換為針對 RMS 歸一化等操作最佳化的版本,您可以在不增加傳統自定義構建複雜性的情況下,潛在地實現顯著的效能提升。請記住檢視 Hub 上每個核心的詳細資訊,以確保正確使用。快來嘗試一下,看看它如何加速您的工作流程!

社群

這是一篇好文章 #

我的實現不完整,輸出結果只是噪音,但即使在 Zero GPU 空間中,SDXL 管線仍然可以工作到輸出階段。替換似乎相對穩定。
此外,kernels 模組的主體是一個 C++ 編譯的二進位制檔案,因此無需 CUDA Toolkit 即可安裝!這在 Zero GPU 環境中是一個很有用的功能。
如果 Hugging Face 的主要庫能夠逐步在內部採用 kernels 庫(透過可選開關),我們可以期待智慧的效能提升。
引數和返回值與原始庫不匹配,因此可能需要一個包裝器。

import torch
from kernels import get_kernel

if torch.cuda.is_available():
    flash_attn = get_kernel("kernels-community/flash-attn")
    try:
        sdpa = torch.nn.functional.scaled_dot_product_attention

        def sdpa_hijack(query, key, value, attn_mask=None, dropout_p=0.0, is_causal=False, scale=None):
            if query.shape[3] <= 128 and attn_mask is None and query.dtype != torch.float32:
                result = flash_attn.mha_fwd(q=query.transpose(1, 2), k=key.transpose(1, 2), v=value.transpose(1, 2),
                    p_dropout=dropout_p, is_causal=is_causal, softmax_scale=1.0 if scale is None else scale)[0]
                hidden_states = result.transpose(1, 2) if result is not None else None
            else:
                hidden_states = sdpa(query=query, key=key, value=value,
                    attn_mask=attn_mask, dropout_p=dropout_p, is_causal=is_causal, scale=scale)
            return hidden_states

        torch.nn.functional.scaled_dot_product_attention = sdpa_hijack
        print("# # #\nHijacked SDPA with kernels Flash Attention\n# # #")
    except ImportError as e:
        print(f"# # #\nCould not load Flash Attention for hijack:\n{e}\n# # #")
else:
    print(f"# # #\nCould not detect GPU\n# # #")

# https://github.com/huggingface/diffusers/discussions/7172
# https://huggingface.co/kernels-community/flash-attn
# https://huggingface.co/blog/hello-hf-kernels

註冊登入 以評論

© . This site is unofficial and not affiliated with Hugging Face, Inc.