使用 Hugging Face + Dask 擴充套件基於 AI 的資料處理

釋出於 2024 年 10 月 9 日
在 GitHub 上更新

Hugging Face 平臺擁有眾多資料集和預訓練模型,使得使用和訓練最先進的機器學習模型變得越來越容易。然而,擴充套件 AI 任務可能很困難,因為 AI 資料集通常很大(從幾百 GB 到幾 TB 不等),並且使用 Hugging Face Transformers 進行模型推理有時計算成本很高。

Dask 是一個用於分散式計算的 Python 庫,它透過將資料集分解成可管理的塊來處理核外計算(即處理無法裝入記憶體的資料)。這使得完成以下任務變得容易:

  • 透過一個模仿 pandas 且易於使用的 API,高效載入和預處理 TB 級資料集
  • 並行模型推理(可選擇多節點 GPU 推理)

在這篇文章中,我們將展示一個來自 FineWeb 資料集的資料處理示例,使用 FineWeb-Edu 分類器來識別具有高教育價值的網頁。我們將展示:

  • 如何使用 pandas 在本地處理 100 行資料
  • 如何使用 Dask 在雲端多個 GPU 上擴充套件到 2.11 億行資料

使用 Pandas 處理 100 行資料

FineWeb 資料集 包含來自 Common Crawl 的 15 萬億個英文網頁資料 token。Common Crawl 是一個非營利組織,託管著每月更新的公共網路爬取資料集。該資料集常用於各種任務,如大型語言模型訓練、分類、內容過濾以及跨多個行業的資訊檢索。

在筆記型電腦上使用 pandas 下載並讀入單個檔案可能需要超過 1 分鐘。

import pandas as pd

df = pd.read_parquet(
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet"
)

接下來,我們將使用 HF 的 FineWeb-Edu 分類器來評估我們資料集中網頁的教育價值。網頁的評分範圍從 0 到 5,其中 0 表示沒有教育價值,5 表示具有很高的教育價值。我們可以使用 pandas 對一個較小的、100 行的資料子集進行此操作,在帶有 GPU 的 M1 Mac 上大約需要 10 秒。

from transformers import pipeline

def compute_scores(texts):
    import torch

    # Select which hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device
    )
    results = pipe(
        texts.to_list(),
        batch_size=25,                    # Choose batch size based on data size and hardware
        padding="longest",
        truncation=True,
        function_to_apply="none"
    )
    
    return pd.Series([r["score"] for r in results])

df = df[:100]
min_edu_score = 3
df["edu-classifier-score"] = compute_scores(df.text)
df = df[df["edu-classifier-score"] >= min_edu_score]

請注意,我們還在 compute_scores 函式中增加了一個檢查可用硬體的步驟,因為在下一步使用 Dask 進行擴充套件時,該函式將被分發執行。這使得從在單臺機器上(無論是 CPU 還是帶 Apple silicon GPU 的 MacBook)進行本地測試,到擴充套件到多臺機器(如 NVIDIA GPU)上的分散式執行變得容易。

使用 Dask 擴充套件至 2.11 億行資料

整個 2024 年 2 月/3 月的爬取資料在磁碟上大小為 432 GB,在記憶體中約為 715 GB,分佈在 250 個 Parquet 檔案中。即使在一臺有足夠記憶體容納整個資料集的機器上,序列處理也會非常緩慢。

為了進行擴充套件,我們可以使用 Dask DataFrame,它透過並行化 pandas 來幫助您處理大型表格資料。它的 API 與 pandas 非常相似,使得從在單個數據集上測試到擴充套件到完整資料集變得容易。Dask 與 Hugging Face 資料集的預設格式 Parquet 配合得很好,可以實現豐富的資料型別、高效的列式過濾和壓縮。

import dask.dataframe as dd

df = dd.read_parquet(
    # Load the full dataset lazily with Dask
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet" 
)

我們將使用 map_partitions 在 Dask DataFrame 上並行應用用於文字分類的 compute_scores 函式。map_partitions 會在更大的 Dask DataFrame 中的每個 pandas DataFrame 上並行應用我們的函式。meta 引數是 Dask 特有的,用於指定輸出的資料結構(列名和資料型別)。

from transformers import pipeline

def compute_scores(texts):
    import torch

    # Select which hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device,
    )
    results = pipe(
        texts.to_list(),
        batch_size=768,
        padding="longest",
        truncation=True,
        function_to_apply="none",
    )

    return pd.Series([r["score"] for r in results])

min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]

請注意,我們為這個示例選擇了一個效果不錯的 batch_size,但您可能需要根據自己工作流中的硬體、資料和模型來自定義這個值(請參閱 HF 關於 pipeline 批處理的文件)。

現在我們已經確定了我們感興趣的資料集行,我們可以儲存結果以供其他下游分析使用。Dask DataFrame 自動支援分散式寫入 Parquet。Hugging Face 使用提交(commit)來跟蹤資料集的更改,並允許並行寫入 Dask DataFrame。

repo_id = "<your-hf-user>/<your-dataset-name>"  # Update with your dataset location
df.to_parquet(f"hf://datasets/{repo_id}")

由於這會為每個檔案建立一個提交,建議在上傳後壓縮歷史記錄。

from huggingface_hub import HfApi

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")

或者,您可以使用這個自定義函式,它可以在每次提交中上傳多個檔案。

多 GPU 並行模型推理

有多種方法可以在各種硬體上部署 Dask。在這裡,我們將使用 Coiled 在雲上部署 Dask,這樣我們就可以按需啟動虛擬機器,並在完成後清理它們。

cluster = coiled.Cluster(
    region="us-east-1",                 # Same region as data
    n_workers=100,                      
    spot_policy="spot_with_fallback",   # Use spot instances, if available
    worker_vm_types="g5.xlarge",        # NVIDIA A10 Tensor Core GPU
    worker_options={"nthreads": 1},
)
client = cluster.get_client()

在幕後,Coiled 會處理:

  • 配置帶有 GPU 硬體的雲虛擬機器。在本例中,是 AWS 上的 g5.xlarge 例項。
  • 設定相應的 NVIDIA 驅動、CUDA 執行時等。
  • 透過包同步功能,在雲虛擬機器上自動安裝與您本地相同的包。這包括您工作目錄中的 Python 檔案。

該工作流耗時約 5 小時完成,並且我們獲得了良好的 GPU 硬體利用率。

Median GPU utilization is 100% and median memory usage is 21.5 GB, just under the 24 GB available on the GPU.
GPU 利用率和記憶體使用率都接近其最大容量,這意味著我們正在很好地利用可用硬體。

總而言之,以下是完整的工作流程

import dask.dataframe as dd
from transformers import pipeline
from huggingface_hub import HfApi
import os
import coiled

cluster = coiled.Cluster(
    region="us-east-1",
    n_workers=100,
    spot_policy="spot_with_fallback",
    worker_vm_types="g5.xlarge",
    worker_options={"nthreads": 1},
)
client = cluster.get_client()
cluster.send_private_envs(
    {"HF_TOKEN": "<your-hf-token>"}             #  Send credentials over encrypted connection
)

df = dd.read_parquet(
    "hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)

def compute_scores(texts):
    import torch

    # Select which hardware to use
    if torch.cuda.is_available():
        device = torch.device("cuda")
    elif torch.backends.mps.is_available():
        device = torch.device("mps")
    else:
        device = torch.device("cpu")

    pipe = pipeline(
        "text-classification",
        model="HuggingFaceFW/fineweb-edu-classifier",
        device=device
    )
    results = pipe(
        texts.to_list(),
        batch_size=768,
        padding="longest",
        truncation=True,
        function_to_apply="none"
    )

    return pd.Series([r["score"] for r in results])

min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]

repo_id = "<your-hf-user>/<your-dataset-name>"  # Replace with your dataset location
df.to_parquet(f"hf://datasets/{repo_id}")

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")  # optional: squash commit history

結論

Hugging Face + Dask 是一個強大的組合。在本例中,我們透過使用 Dask + Coiled 在雲端多個 GPU 上並行執行工作流,將分類任務從 100 行擴充套件到了 2.11 億行。

同樣型別的工作流也可以用於其他用例,例如:

  • 過濾基因組資料以選擇感興趣的基因
  • 從非結構化文字中提取資訊並將其轉化為結構化資料集
  • 清理從網際網路或 Common Crawl 抓取的文字資料
  • 執行多模態模型推理以分析大型音訊、影像或影片資料集

社群

註冊登入 以發表評論

© . This site is unofficial and not affiliated with Hugging Face, Inc.