Hub 文件

Spark

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

Spark

Spark 支援在分散式環境中進行即時、大規模的資料處理。

你可以使用 pyspark_huggingface 透過 “huggingface” 資料來源在 PySpark 中訪問 Hugging Face 的資料集倉庫。

請在 Hugging Face Spaces 上試用 Spark Notebooks,以獲取預裝了 PySpark 和 pyspark_huggingface 的 Notebooks。

設定

安裝

要能夠讀寫 Hugging Face 資料集,你需要安裝 pyspark_huggingface 庫。

pip install pyspark_huggingface

這也會安裝所需的依賴項,如用於身份驗證的 huggingface_hub,以及用於讀寫資料集的 pyarrow

身份驗證

你需要透過 Hugging Face 的身份驗證才能讀取私有/門控的資料集倉庫或向你自己的資料集倉庫寫入資料。

你可以使用命令列介面(CLI)進行驗證:

hf auth login

也可以透過 HF_TOKEN 環境變數提供你的 Hugging Face 令牌,或者將 token 選項傳遞給讀取器。有關身份驗證的更多詳細資訊,請檢視此指南

啟用 “huggingface” 資料來源

PySpark 4 引入了新的資料來源 API,允許使用來自自定義源的資料集。如果安裝了 pyspark_huggingface,PySpark 會自動匯入它並啟用 “huggingface” 資料來源。

該庫還為 PySpark 3.5、3.4 和 3.3 向後移植了 “huggingface” 資料來源的 API。但在這種情況下,需要顯式匯入 pyspark_huggingface 來啟用向後移植的功能並啟用 “huggingface” 資料來源。

>>> import pyspark_huggingface
huggingface datasource enabled for pyspark 3.x.x (backport from pyspark 4)

讀取

“huggingface” 資料來源允許從 Hugging Face 讀取資料集,其底層使用 pyarrow 來流式傳輸 Arrow 資料。這與 Hugging Face 上所有支援格式的資料集相容,例如 Parquet 資料集。

例如,以下是如何載入 stanfordnlp/imdb 資料集:

>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb")

這是另一個使用 BAAI/Infinity-Instruct 資料集的例子。這是一個門控倉庫,使用者在訪問之前必須接受使用條款。它還有多個子集,即“3M”和“7M”。因此,我們需要指定載入哪一個。

我們使用 .format() 函式來指定 “huggingface” 資料來源,並使用 .load() 來載入資料集(更確切地說,是名為 “7M” 的配置或子集,包含 700 萬個樣本)。然後,我們計算每種語言的對話數量並對資料集進行過濾。

登入以訪問門控倉庫後,我們可以執行:

>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct")
>>> df.show()
+---+----------------------------+-----+----------+--------------------+        
| id|               conversations|label|langdetect|              source|
+---+----------------------------+-----+----------+--------------------+
|  0|        [{human, def exti...|     |        en|      code_exercises|
|  1|        [{human, See the ...|     |        en|                flan|
|  2|        [{human, This is ...|     |        en|                flan|
|  3|        [{human, If you d...|     |        en|                flan|
|  4|        [{human, In a Uni...|     |        en|                flan|
|  5|        [{human, Read the...|     |        en|                flan|
|  6|        [{human, You are ...|     |        en|          code_bagel|
|  7|        [{human, I want y...|     |        en|          Subjective|
|  8|        [{human, Given th...|     |        en|                flan|
|  9|[{human, 因果聯絡原則是法...|     |     zh-cn|          Subjective|
| 10|        [{human, Provide ...|     |        en|self-oss-instruct...|
| 11|        [{human, The univ...|     |        en|                flan|
| 12|        [{human, Q: I am ...|     |        en|                flan|
| 13|        [{human, What is ...|     |        en|      OpenHermes-2.5|
| 14|        [{human, In react...|     |        en|                flan|
| 15|        [{human, Write Py...|     |        en|      code_exercises|
| 16|        [{human, Find the...|     |        en|            MetaMath|
| 17|        [{human, Three of...|     |        en|            MetaMath|
| 18|        [{human, Chandra ...|     |        en|            MetaMath|
| 19|[{human, 用經濟學知識分析...|     |     zh-cn|          Subjective|
+---+----------------------------+-----+----------+--------------------+

這將以流式方式載入資料集,並且輸出的 DataFrame 中每個資料檔案對應一個分割槽,以實現高效的分散式處理。

為了計算每種語言的對話數量,我們執行這段使用 columns 選項和 groupBy() 操作的程式碼。columns 選項很有用,因為它只加載我們需要的資料,因為 PySpark 的資料來源 API 不支援謂詞下推。還有一個 filters 選項,可以只加載值在特定範圍內的資料。

>>> df_langdetect_only = (
...     spark.read.format("huggingface")
...     .option("config", "7M")
...     .option("columns", '["langdetect"]')
...     .load("BAAI/Infinity-Instruct")
... )
>>> df_langdetect_only.groupBy("langdetect").count().show()
+----------+-------+                                                            
|langdetect|  count|
+----------+-------+
|        en|6697793|
|     zh-cn| 751313|
+----------+-------+

要過濾資料集,只保留中文對話:

>>> df_chinese_only = (
...     spark.read.format("huggingface")
...     .option("config", "7M")
...     .option("filters", '[("langdetect", "=", "zh-cn")]')
...     .load("BAAI/Infinity-Instruct")
... )
>>> df_chinese_only.show()
+---+----------------------------+-----+----------+----------+                  
| id|               conversations|label|langdetect|    source|
+---+----------------------------+-----+----------+----------+
|  9|[{human, 因果聯絡原則是法...|     |     zh-cn|Subjective|
| 19|[{human, 用經濟學知識分析...|     |     zh-cn|Subjective|
| 38| [{human, 某個考試共有A、...|     |     zh-cn|Subjective|
| 39|[{human, 撰寫一篇關於斐波...|     |     zh-cn|Subjective|
| 57|[{human, 總結世界歷史上的...|     |     zh-cn|Subjective|
| 61|[{human, 生成一則廣告詞。...|     |     zh-cn|Subjective|
| 66|[{human, 描述一個有效的團...|     |     zh-cn|Subjective|
| 94|[{human, 如果比利和蒂芙尼...|     |     zh-cn|Subjective|
|102|[{human, 生成一句英文名言...|     |     zh-cn|Subjective|
|106|[{human, 寫一封感謝信,感...|     |     zh-cn|Subjective|
|118| [{human, 生成一個故事。}...|     |     zh-cn|Subjective|
|174|[{human, 高膽固醇水平的後...|     |     zh-cn|Subjective|
|180|[{human, 基於以下角色資訊...|     |     zh-cn|Subjective|
|192|[{human, 請寫一篇文章,概...|     |     zh-cn|Subjective|
|221|[{human, 以詩歌形式表達對...|     |     zh-cn|Subjective|
|228|[{human, 根據給定的指令,...|     |     zh-cn|Subjective|
|236|[{human, 開啟一個新的生成...|     |     zh-cn|Subjective|
|260|[{human, 生成一個有關未來...|     |     zh-cn|Subjective|
|268|[{human, 如果有一定數量的...|     |     zh-cn|Subjective|
|273| [{human, 題目:小明有5個...|     |     zh-cn|Subjective|
+---+----------------------------+-----+----------+----------+

也可以在載入的 DataFrame 上應用過濾器或刪除列,但在載入時進行這些操作效率更高,尤其是在 Parquet 資料集上。這是因為 Parquet 在檔案和行組級別包含了元資料,這使得可以跳過不包含滿足條件樣本的整個資料集部分。Parquet 中的列也可以獨立載入,這允許跳過排除的列並避免載入不必要的資料。

選項

以下是可傳遞給 read..option() 的選項列表:

  • config (string):選擇一個數據集子集/配置
  • split (string):選擇一個數據集劃分(預設為 “train”)
  • token (string):你的 Hugging Face 令牌

對於 Parquet 資料集

  • columns (string):選擇要載入的列的子集,例如 '["id"]'
  • filters (string):用於跳過不符合條件的檔案和行組,例如 '["source", "=", "code_exercises"]'。過濾器會傳遞給 pyarrow.parquet.ParquetDataset

任何其他選項都會作為引數傳遞給 [datasets.load_dataset] (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset)

執行 SQL 查詢

一旦你的 PySpark DataFrame 準備就緒,你就可以使用 spark.sql 執行 SQL 查詢。

>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = (
...     spark.read.format("huggingface")
...     .option("config", "7M")
...     .option("columns", '["source"]')
...     .load("BAAI/Infinity-Instruct")
... )
>>> spark.sql("SELECT source, count(*) AS total FROM {df} GROUP BY source ORDER BY total DESC", df=df).show()
+--------------------+-------+
|              source|  total|
+--------------------+-------+
|                flan|2435840|
|          Subjective|1342427|
|      OpenHermes-2.5| 855478|
|            MetaMath| 690138|
|      code_exercises| 590958|
|Orca-math-word-pr...| 398168|
|          code_bagel| 386649|
|        MathInstruct| 329254|
|python-code-datas...|  88632|
|instructional_cod...|  82920|
|        CodeFeedback|  79513|
|self-oss-instruct...|  50467|
|Evol-Instruct-Cod...|  43354|
|CodeExercise-Pyth...|  27159|
|code_instructions...|  23130|
|  Code-Instruct-700k|  10860|
|Glaive-code-assis...|   9281|
|python_code_instr...|   2581|
|Python-Code-23k-S...|   2297|
+--------------------+-------+

再次強調,指定 columns 選項不是必須的,但它有助於避免載入不必要的資料,從而使查詢更快。

寫入

你可以使用 “huggingface” 資料來源將 PySpark DataFrame 寫入 Hugging Face。它以分散式方式並行上傳 Parquet 檔案,並且只有在所有檔案都上傳完畢後才提交。工作方式如下:

>>> import pyspark_huggingface
>>> df.write.format("huggingface").save("username/dataset_name")

下面是我們如何使用這個函式將過濾後的 BAAI/Infinity-Instruct 資料集寫回 Hugging Face 的例子。

首先你需要建立一個數據集倉庫,例如 username/Infinity-Instruct-Chinese-Only(如果你願意,可以將其設為私有)。然後,確保你已經透過身份驗證並且可以使用 “huggingface” 資料來源,將 mode 設定為 “overwrite”(或 “append” 如果你想擴充套件現有資料集),然後使用 .save() 推送到 Hugging Face。

>>> df_chinese_only.write.format("huggingface").mode("overwrite").save("username/Infinity-Instruct-Chinese-Only")

模式

將資料集推送到 Hugging Face 時有兩種模式可用:

  • “overwrite”:如果資料集已存在,則覆蓋它
  • “append”:將資料集追加到現有資料集中

選項

以下是可傳遞給 write.option() 的選項列表:

  • token (string):你的 Hugging Face 令牌

歡迎貢獻程式碼以在此處新增更多選項,特別是 subsetsplit

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.