Hub 文件

Dask

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

Dask

Dask 是一個並行和分散式計算庫,用於擴充套件現有的 Python 和 PyData 生態系統。

特別是,我們可以使用 Dask DataFrame 來擴充套件 pandas 的工作流程。Dask DataFrame 透過並行化 pandas 來處理大型表格資料。它與 pandas 的 API 非常相似,使得從在單個數據集上進行測試過渡到處理完整資料集變得簡單。Dask 在處理 Parquet 格式時尤其高效,這是 Hugging Face Datasets 上的預設格式,因為它支援豐富的資料型別、高效的列過濾和壓縮。

Dask 的一個很好的實際用例是以分散式方式在資料集上執行資料處理或模型推理。例如,請參閱 Coiled 發表的關於使用 Hugging Face + Dask 擴充套件基於 AI 的資料處理的優秀部落格文章。

讀和寫

由於 Dask 使用 fsspec 來讀寫遠端資料,您可以使用 Hugging Face 路徑(hf://)來在 Hub 上讀寫資料。

首先,您需要使用您的 Hugging Face 賬戶登入,例如使用

hf auth login

然後您可以建立一個數據集倉庫,例如使用

from huggingface_hub import HfApi

HfApi().create_repo(repo_id="username/my_dataset", repo_type="dataset")

最後,您可以在 Dask 中使用Hugging Face 路徑。Dask DataFrame 支援在 Hugging Face 上以分散式方式寫入 Parquet,它使用提交來跟蹤資料集的更改。

import dask.dataframe as dd

df.to_parquet("hf://datasets/username/my_dataset")

# or write in separate directories if the dataset has train/validation/test splits
df_train.to_parquet("hf://datasets/username/my_dataset/train")
df_valid.to_parquet("hf://datasets/username/my_dataset/validation")
df_test .to_parquet("hf://datasets/username/my_dataset/test")

由於這會為每個檔案建立一個提交,建議在上傳後壓縮歷史記錄。

from huggingface_hub import HfApi

HfApi().super_squash_history(repo_id=repo_id, repo_type="dataset")

這會建立一個名為 `username/my_dataset` 的資料集倉庫,其中包含您的 Dask 資料集(Parquet 格式)。您稍後可以重新載入它。

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/username/my_dataset")

# or read from separate directories if the dataset has train/validation/test splits
df_train = dd.read_parquet("hf://datasets/username/my_dataset/train")
df_valid = dd.read_parquet("hf://datasets/username/my_dataset/validation")
df_test  = dd.read_parquet("hf://datasets/username/my_dataset/test")

有關 Hugging Face 路徑及其實現方式的更多資訊,請參閱客戶端庫中關於 HfFileSystem 的文件

處理資料

要使用 Dask 並行處理資料集,您可以首先為 pandas DataFrame 或 Series 定義您的資料處理函式,然後使用 Dask 的 `map_partitions` 函式將此函式並行地應用於資料集的所有分割槽。

def dummy_count_words(texts):
    return pd.Series([len(text.split(" ")) for text in texts])

或使用 pandas 字串方法的類似函式(速度更快)

def dummy_count_words(texts):
    return texts.str.count(" ")

在 pandas 中,您可以在文字列上使用此函式

# pandas API
df["num_words"] = dummy_count_words(df.text)

在 Dask 中,您可以在每個分割槽上執行此函式

# Dask API: run the function on every partition
df["num_words"] = df.text.map_partitions(dummy_count_words, meta=int)

請注意,您還需要提供 `meta`,即函式輸出中 pandas Series 或 DataFrame 的型別。這是必需的,因為 Dask DataFrame 使用惰性 API。由於 Dask 只有在呼叫 `.compute()` 時才會執行資料處理,因此它需要 `meta` 引數來在此期間瞭解新列的型別。

謂詞和投影下推

從 Hugging Face 讀取 Parquet 資料時,Dask 會自動利用 Parquet 檔案中的元資料來跳過不需要的整個檔案或行組。例如,如果您對 Parquet 格式的 Hugging Face 資料集應用篩選(謂詞)或選擇列的子集(投影),Dask 將讀取 Parquet 檔案的元資料,以丟棄不需要的部分,而無需下載它們。

這得益於對Dask DataFrame API 的重新實現,以支援查詢最佳化,這使得 Dask 更快、更穩健。

例如,FineWeb-Edu 的這個子集包含許多 Parquet 檔案。如果您可以篩選資料集以保留最近的 CC 轉儲中的文字,Dask 將跳過大部分檔案,只下載與篩選條件匹配的資料。

import dask.dataframe as dd

df = dd.read_parquet("hf://datasets/HuggingFaceFW/fineweb-edu/sample/10BT/*.parquet")

# Dask will skip the files or row groups that don't
# match the query without downloading them.
df = df[df.dump >= "CC-MAIN-2023"]

Dask 也只會讀取計算所需的列,並跳過其餘的列。例如,如果您在程式碼後期刪除了某一列,如果不需要該列,它就不會在管道的早期載入它。這在您想要操作列的子集或進行分析時非常有用。

# Dask will download the 'dump' and 'token_count' needed
# for the filtering and computation and skip the other columns.
df.token_count.mean().compute()

客戶端

`dask` 中的大多數功能都針對叢集或本地 `Client` 進行了最佳化,以啟動平行計算。

import dask.dataframe as dd
from distributed import Client

if __name__ == "__main__":  # needed for creating new processes
    client = Client()
    df = dd.read_parquet(...)
    ...

對於本地使用,`Client` 預設使用帶有 `LocalCluster` 的多程序。您可以手動配置 `LocalCluster` 的多程序:

from dask.distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=8, threads_per_worker=8)
client = Client(cluster)

請注意,如果您在本地不使用 `Client` 的情況下使用預設的執行緒排程器,DataFrame 在某些操作後可能會變慢(更多詳情請見此處)。

有關設定本地或雲集群的更多資訊,請參閱部署 Dask 文件

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.