Hub 文件
Spark
並獲得增強的文件體驗
開始使用
Spark
Spark 支援在分散式環境中進行即時、大規模的資料處理。
你可以使用 pyspark_huggingface
透過 “huggingface” 資料來源在 PySpark 中訪問 Hugging Face 的資料集倉庫。
請在 Hugging Face Spaces 上試用 Spark Notebooks,以獲取預裝了 PySpark 和 pyspark_huggingface
的 Notebooks。

設定
安裝
要能夠讀寫 Hugging Face 資料集,你需要安裝 pyspark_huggingface
庫。
pip install pyspark_huggingface
這也會安裝所需的依賴項,如用於身份驗證的 huggingface_hub
,以及用於讀寫資料集的 pyarrow
。
身份驗證
你需要透過 Hugging Face 的身份驗證才能讀取私有/門控的資料集倉庫或向你自己的資料集倉庫寫入資料。
你可以使用命令列介面(CLI)進行驗證:
hf auth login
也可以透過 HF_TOKEN
環境變數提供你的 Hugging Face 令牌,或者將 token
選項傳遞給讀取器。有關身份驗證的更多詳細資訊,請檢視此指南。
啟用 “huggingface” 資料來源
PySpark 4 引入了新的資料來源 API,允許使用來自自定義源的資料集。如果安裝了 pyspark_huggingface
,PySpark 會自動匯入它並啟用 “huggingface” 資料來源。
該庫還為 PySpark 3.5、3.4 和 3.3 向後移植了 “huggingface” 資料來源的 API。但在這種情況下,需要顯式匯入 pyspark_huggingface
來啟用向後移植的功能並啟用 “huggingface” 資料來源。
>>> import pyspark_huggingface
huggingface datasource enabled for pyspark 3.x.x (backport from pyspark 4)
讀取
“huggingface” 資料來源允許從 Hugging Face 讀取資料集,其底層使用 pyarrow
來流式傳輸 Arrow 資料。這與 Hugging Face 上所有支援格式的資料集相容,例如 Parquet 資料集。
例如,以下是如何載入 stanfordnlp/imdb 資料集:
>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").load("stanfordnlp/imdb")
這是另一個使用 BAAI/Infinity-Instruct 資料集的例子。這是一個門控倉庫,使用者在訪問之前必須接受使用條款。它還有多個子集,即“3M”和“7M”。因此,我們需要指定載入哪一個。


我們使用 .format()
函式來指定 “huggingface” 資料來源,並使用 .load()
來載入資料集(更確切地說,是名為 “7M” 的配置或子集,包含 700 萬個樣本)。然後,我們計算每種語言的對話數量並對資料集進行過濾。
登入以訪問門控倉庫後,我們可以執行:
>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = spark.read.format("huggingface").option("config", "7M").load("BAAI/Infinity-Instruct")
>>> df.show()
+---+----------------------------+-----+----------+--------------------+
| id| conversations|label|langdetect| source|
+---+----------------------------+-----+----------+--------------------+
| 0| [{human, def exti...| | en| code_exercises|
| 1| [{human, See the ...| | en| flan|
| 2| [{human, This is ...| | en| flan|
| 3| [{human, If you d...| | en| flan|
| 4| [{human, In a Uni...| | en| flan|
| 5| [{human, Read the...| | en| flan|
| 6| [{human, You are ...| | en| code_bagel|
| 7| [{human, I want y...| | en| Subjective|
| 8| [{human, Given th...| | en| flan|
| 9|[{human, 因果聯絡原則是法...| | zh-cn| Subjective|
| 10| [{human, Provide ...| | en|self-oss-instruct...|
| 11| [{human, The univ...| | en| flan|
| 12| [{human, Q: I am ...| | en| flan|
| 13| [{human, What is ...| | en| OpenHermes-2.5|
| 14| [{human, In react...| | en| flan|
| 15| [{human, Write Py...| | en| code_exercises|
| 16| [{human, Find the...| | en| MetaMath|
| 17| [{human, Three of...| | en| MetaMath|
| 18| [{human, Chandra ...| | en| MetaMath|
| 19|[{human, 用經濟學知識分析...| | zh-cn| Subjective|
+---+----------------------------+-----+----------+--------------------+
這將以流式方式載入資料集,並且輸出的 DataFrame 中每個資料檔案對應一個分割槽,以實現高效的分散式處理。
為了計算每種語言的對話數量,我們執行這段使用 columns
選項和 groupBy()
操作的程式碼。columns
選項很有用,因為它只加載我們需要的資料,因為 PySpark 的資料來源 API 不支援謂詞下推。還有一個 filters
選項,可以只加載值在特定範圍內的資料。
>>> df_langdetect_only = (
... spark.read.format("huggingface")
... .option("config", "7M")
... .option("columns", '["langdetect"]')
... .load("BAAI/Infinity-Instruct")
... )
>>> df_langdetect_only.groupBy("langdetect").count().show()
+----------+-------+
|langdetect| count|
+----------+-------+
| en|6697793|
| zh-cn| 751313|
+----------+-------+
要過濾資料集,只保留中文對話:
>>> df_chinese_only = (
... spark.read.format("huggingface")
... .option("config", "7M")
... .option("filters", '[("langdetect", "=", "zh-cn")]')
... .load("BAAI/Infinity-Instruct")
... )
>>> df_chinese_only.show()
+---+----------------------------+-----+----------+----------+
| id| conversations|label|langdetect| source|
+---+----------------------------+-----+----------+----------+
| 9|[{human, 因果聯絡原則是法...| | zh-cn|Subjective|
| 19|[{human, 用經濟學知識分析...| | zh-cn|Subjective|
| 38| [{human, 某個考試共有A、...| | zh-cn|Subjective|
| 39|[{human, 撰寫一篇關於斐波...| | zh-cn|Subjective|
| 57|[{human, 總結世界歷史上的...| | zh-cn|Subjective|
| 61|[{human, 生成一則廣告詞。...| | zh-cn|Subjective|
| 66|[{human, 描述一個有效的團...| | zh-cn|Subjective|
| 94|[{human, 如果比利和蒂芙尼...| | zh-cn|Subjective|
|102|[{human, 生成一句英文名言...| | zh-cn|Subjective|
|106|[{human, 寫一封感謝信,感...| | zh-cn|Subjective|
|118| [{human, 生成一個故事。}...| | zh-cn|Subjective|
|174|[{human, 高膽固醇水平的後...| | zh-cn|Subjective|
|180|[{human, 基於以下角色資訊...| | zh-cn|Subjective|
|192|[{human, 請寫一篇文章,概...| | zh-cn|Subjective|
|221|[{human, 以詩歌形式表達對...| | zh-cn|Subjective|
|228|[{human, 根據給定的指令,...| | zh-cn|Subjective|
|236|[{human, 開啟一個新的生成...| | zh-cn|Subjective|
|260|[{human, 生成一個有關未來...| | zh-cn|Subjective|
|268|[{human, 如果有一定數量的...| | zh-cn|Subjective|
|273| [{human, 題目:小明有5個...| | zh-cn|Subjective|
+---+----------------------------+-----+----------+----------+
也可以在載入的 DataFrame 上應用過濾器或刪除列,但在載入時進行這些操作效率更高,尤其是在 Parquet 資料集上。這是因為 Parquet 在檔案和行組級別包含了元資料,這使得可以跳過不包含滿足條件樣本的整個資料集部分。Parquet 中的列也可以獨立載入,這允許跳過排除的列並避免載入不必要的資料。
選項
以下是可傳遞給 read..option()
的選項列表:
config
(string):選擇一個數據集子集/配置split
(string):選擇一個數據集劃分(預設為 “train”)token
(string):你的 Hugging Face 令牌
對於 Parquet 資料集
columns
(string):選擇要載入的列的子集,例如'["id"]'
filters
(string):用於跳過不符合條件的檔案和行組,例如'["source", "=", "code_exercises"]'
。過濾器會傳遞給 pyarrow.parquet.ParquetDataset。
任何其他選項都會作為引數傳遞給 [datasets.load_dataset] (https://huggingface.co/docs/datasets/en/package_reference/loading_methods#datasets.load_dataset)
執行 SQL 查詢
一旦你的 PySpark DataFrame 準備就緒,你就可以使用 spark.sql
執行 SQL 查詢。
>>> import pyspark_huggingface
>>> from pyspark.sql import SparkSession
>>> spark = SparkSession.builder.appName("demo").getOrCreate()
>>> df = (
... spark.read.format("huggingface")
... .option("config", "7M")
... .option("columns", '["source"]')
... .load("BAAI/Infinity-Instruct")
... )
>>> spark.sql("SELECT source, count(*) AS total FROM {df} GROUP BY source ORDER BY total DESC", df=df).show()
+--------------------+-------+
| source| total|
+--------------------+-------+
| flan|2435840|
| Subjective|1342427|
| OpenHermes-2.5| 855478|
| MetaMath| 690138|
| code_exercises| 590958|
|Orca-math-word-pr...| 398168|
| code_bagel| 386649|
| MathInstruct| 329254|
|python-code-datas...| 88632|
|instructional_cod...| 82920|
| CodeFeedback| 79513|
|self-oss-instruct...| 50467|
|Evol-Instruct-Cod...| 43354|
|CodeExercise-Pyth...| 27159|
|code_instructions...| 23130|
| Code-Instruct-700k| 10860|
|Glaive-code-assis...| 9281|
|python_code_instr...| 2581|
|Python-Code-23k-S...| 2297|
+--------------------+-------+
再次強調,指定 columns
選項不是必須的,但它有助於避免載入不必要的資料,從而使查詢更快。
寫入
你可以使用 “huggingface” 資料來源將 PySpark DataFrame 寫入 Hugging Face。它以分散式方式並行上傳 Parquet 檔案,並且只有在所有檔案都上傳完畢後才提交。工作方式如下:
>>> import pyspark_huggingface
>>> df.write.format("huggingface").save("username/dataset_name")
下面是我們如何使用這個函式將過濾後的 BAAI/Infinity-Instruct 資料集寫回 Hugging Face 的例子。
首先你需要建立一個數據集倉庫,例如 username/Infinity-Instruct-Chinese-Only
(如果你願意,可以將其設為私有)。然後,確保你已經透過身份驗證並且可以使用 “huggingface” 資料來源,將 mode
設定為 “overwrite”(或 “append” 如果你想擴充套件現有資料集),然後使用 .save()
推送到 Hugging Face。
>>> df_chinese_only.write.format("huggingface").mode("overwrite").save("username/Infinity-Instruct-Chinese-Only")


模式
將資料集推送到 Hugging Face 時有兩種模式可用:
- “overwrite”:如果資料集已存在,則覆蓋它
- “append”:將資料集追加到現有資料集中
選項
以下是可傳遞給 write.option()
的選項列表:
token
(string):你的 Hugging Face 令牌
歡迎貢獻程式碼以在此處新增更多選項,特別是 subset
和 split
。