使用 Hugging Face + Dask 擴充套件基於 AI 的資料處理
Hugging Face 平臺擁有眾多資料集和預訓練模型,使得使用和訓練最先進的機器學習模型變得越來越容易。然而,擴充套件 AI 任務可能很困難,因為 AI 資料集通常很大(從幾百 GB 到幾 TB 不等),並且使用 Hugging Face Transformers 進行模型推理有時計算成本很高。
Dask 是一個用於分散式計算的 Python 庫,它透過將資料集分解成可管理的塊來處理核外計算(即處理無法裝入記憶體的資料)。這使得完成以下任務變得容易:
- 透過一個模仿 pandas 且易於使用的 API,高效載入和預處理 TB 級資料集
- 並行模型推理(可選擇多節點 GPU 推理)
在這篇文章中,我們將展示一個來自 FineWeb 資料集的資料處理示例,使用 FineWeb-Edu 分類器來識別具有高教育價值的網頁。我們將展示:
- 如何使用 pandas 在本地處理 100 行資料
- 如何使用 Dask 在雲端多個 GPU 上擴充套件到 2.11 億行資料
使用 Pandas 處理 100 行資料
FineWeb 資料集 包含來自 Common Crawl 的 15 萬億個英文網頁資料 token。Common Crawl 是一個非營利組織,託管著每月更新的公共網路爬取資料集。該資料集常用於各種任務,如大型語言模型訓練、分類、內容過濾以及跨多個行業的資訊檢索。
在筆記型電腦上使用 pandas 下載並讀入單個檔案可能需要超過 1 分鐘。
import pandas as pd
df = pd.read_parquet(
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/000_00000.parquet"
)
接下來,我們將使用 HF 的 FineWeb-Edu 分類器來評估我們資料集中網頁的教育價值。網頁的評分範圍從 0 到 5,其中 0 表示沒有教育價值,5 表示具有很高的教育價值。我們可以使用 pandas 對一個較小的、100 行的資料子集進行此操作,在帶有 GPU 的 M1 Mac 上大約需要 10 秒。
from transformers import pipeline
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device
)
results = pipe(
texts.to_list(),
batch_size=25, # Choose batch size based on data size and hardware
padding="longest",
truncation=True,
function_to_apply="none"
)
return pd.Series([r["score"] for r in results])
df = df[:100]
min_edu_score = 3
df["edu-classifier-score"] = compute_scores(df.text)
df = df[df["edu-classifier-score"] >= min_edu_score]
請注意,我們還在 compute_scores
函式中增加了一個檢查可用硬體的步驟,因為在下一步使用 Dask 進行擴充套件時,該函式將被分發執行。這使得從在單臺機器上(無論是 CPU 還是帶 Apple silicon GPU 的 MacBook)進行本地測試,到擴充套件到多臺機器(如 NVIDIA GPU)上的分散式執行變得容易。
使用 Dask 擴充套件至 2.11 億行資料
整個 2024 年 2 月/3 月的爬取資料在磁碟上大小為 432 GB,在記憶體中約為 715 GB,分佈在 250 個 Parquet 檔案中。即使在一臺有足夠記憶體容納整個資料集的機器上,序列處理也會非常緩慢。
為了進行擴充套件,我們可以使用 Dask DataFrame,它透過並行化 pandas 來幫助您處理大型表格資料。它的 API 與 pandas 非常相似,使得從在單個數據集上測試到擴充套件到完整資料集變得容易。Dask 與 Hugging Face 資料集的預設格式 Parquet 配合得很好,可以實現豐富的資料型別、高效的列式過濾和壓縮。
import dask.dataframe as dd
df = dd.read_parquet(
# Load the full dataset lazily with Dask
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)
我們將使用 map_partitions
在 Dask DataFrame 上並行應用用於文字分類的 compute_scores
函式。map_partitions
會在更大的 Dask DataFrame 中的每個 pandas DataFrame 上並行應用我們的函式。meta
引數是 Dask 特有的,用於指定輸出的資料結構(列名和資料型別)。
from transformers import pipeline
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device,
)
results = pipe(
texts.to_list(),
batch_size=768,
padding="longest",
truncation=True,
function_to_apply="none",
)
return pd.Series([r["score"] for r in results])
min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]
請注意,我們為這個示例選擇了一個效果不錯的 batch_size
,但您可能需要根據自己工作流中的硬體、資料和模型來自定義這個值(請參閱 HF 關於 pipeline 批處理的文件)。
現在我們已經確定了我們感興趣的資料集行,我們可以儲存結果以供其他下游分析使用。Dask DataFrame 自動支援分散式寫入 Parquet。Hugging Face 使用提交(commit)來跟蹤資料集的更改,並允許並行寫入 Dask DataFrame。
repo_id = "<your-hf-user>/<your-dataset-name>" # Update with your dataset location
df.to_parquet(f"hf://datasets/{repo_id}")
由於這會為每個檔案建立一個提交,建議在上傳後壓縮歷史記錄。
from huggingface_hub import HfApi
HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")
或者,您可以使用這個自定義函式,它可以在每次提交中上傳多個檔案。
多 GPU 並行模型推理
有多種方法可以在各種硬體上部署 Dask。在這裡,我們將使用 Coiled 在雲上部署 Dask,這樣我們就可以按需啟動虛擬機器,並在完成後清理它們。
cluster = coiled.Cluster(
region="us-east-1", # Same region as data
n_workers=100,
spot_policy="spot_with_fallback", # Use spot instances, if available
worker_vm_types="g5.xlarge", # NVIDIA A10 Tensor Core GPU
worker_options={"nthreads": 1},
)
client = cluster.get_client()
在幕後,Coiled 會處理:
- 配置帶有 GPU 硬體的雲虛擬機器。在本例中,是 AWS 上的 g5.xlarge 例項。
- 設定相應的 NVIDIA 驅動、CUDA 執行時等。
- 透過包同步功能,在雲虛擬機器上自動安裝與您本地相同的包。這包括您工作目錄中的 Python 檔案。
該工作流耗時約 5 小時完成,並且我們獲得了良好的 GPU 硬體利用率。

總而言之,以下是完整的工作流程
import dask.dataframe as dd
from transformers import pipeline
from huggingface_hub import HfApi
import os
import coiled
cluster = coiled.Cluster(
region="us-east-1",
n_workers=100,
spot_policy="spot_with_fallback",
worker_vm_types="g5.xlarge",
worker_options={"nthreads": 1},
)
client = cluster.get_client()
cluster.send_private_envs(
{"HF_TOKEN": "<your-hf-token>"} # Send credentials over encrypted connection
)
df = dd.read_parquet(
"hf://datasets/HuggingFaceFW/fineweb/data/CC-MAIN-2024-10/*.parquet"
)
def compute_scores(texts):
import torch
# Select which hardware to use
if torch.cuda.is_available():
device = torch.device("cuda")
elif torch.backends.mps.is_available():
device = torch.device("mps")
else:
device = torch.device("cpu")
pipe = pipeline(
"text-classification",
model="HuggingFaceFW/fineweb-edu-classifier",
device=device
)
results = pipe(
texts.to_list(),
batch_size=768,
padding="longest",
truncation=True,
function_to_apply="none"
)
return pd.Series([r["score"] for r in results])
min_edu_score = 3
df["edu-classifier-score"] = df.text.map_partitions(compute_scores, meta=pd.Series([0]))
df = df[df["edu-classifier-score"] >= min_edu_score]
repo_id = "<your-hf-user>/<your-dataset-name>" # Replace with your dataset location
df.to_parquet(f"hf://datasets/{repo_id}")
HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset") # optional: squash commit history
結論
Hugging Face + Dask 是一個強大的組合。在本例中,我們透過使用 Dask + Coiled 在雲端多個 GPU 上並行執行工作流,將分類任務從 100 行擴充套件到了 2.11 億行。
同樣型別的工作流也可以用於其他用例,例如:
- 過濾基因組資料以選擇感興趣的基因
- 從非結構化文字中提取資訊並將其轉化為結構化資料集
- 清理從網際網路或 Common Crawl 抓取的文字資料
- 執行多模態模型推理以分析大型音訊、影像或影片資料集