Parquet 內容定義分塊
透過利用新的 Xet 儲存層和 Apache Arrow 的 Parquet 內容定義分塊 (CDC) 功能,減少在 Hugging Face Hub 上傳和下載 Parquet 檔案的時間,從而實現更高效、更具可擴充套件性的資料工作流。
摘要: PyArrow 和 Pandas 現已支援 Parquet 內容定義分塊 (CDC),可在 Hugging Face 的 Xet 儲存層等內容可定址儲存系統上實現 Parquet 檔案的高效去重。CDC 透過只上傳或下載已更改的資料塊,極大地降低了資料傳輸和儲存成本。透過傳遞 use_content_defined_chunking
引數來啟用 CDC。
import pandas as pd
import pyarrow.parquet as pq
df.to_parquet("hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
pq.write_table(table, "hf://datasets/{user}/{repo}/path.parquet", use_content_defined_chunking=True)
目錄
簡介
Apache Parquet 是一種列式儲存格式,在資料工程社群中被廣泛使用。
截至今日,Hugging Face 託管了近 21 PB 的資料集,其中僅 Parquet 檔案就佔用了超過 4 PB 的儲存空間。因此,最佳化 Parquet 儲存是一項高度優先的任務。Hugging Face 引入了一個名為 Xet 的新儲存層,它利用內容定義分塊來高效地對資料塊進行去重,從而降低儲存成本並提高下載/上傳速度。
雖然 Xet 是格式無關的,但 Parquet 的佈局和基於列塊(資料頁)的壓縮方式,可能會因微小的資料變化而產生完全不同的位元組級表示,從而導致去重效能不佳。為了解決這個問題,Parquet 檔案的寫入方式應該儘量減少相似資料之間的位元組級差異,而這正是內容定義分塊 (CDC) 發揮作用的地方。
讓我們來探討一下新的 Parquet CDC 功能與 Hugging Face 的 Xet 儲存層結合使用所帶來的效能優勢。
資料準備
為了演示,我們將使用 OpenOrca 資料集的一個大小適中的子集。
import numpy as np
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq
from huggingface_hub import hf_hub_download
def shuffle_table(table, seed=40):
rng = np.random.default_rng(seed)
indices = rng.permutation(len(table))
return table.take(indices)
# download the dataset from Hugging Face Hub into local cache
path = hf_hub_download(
repo_id="Open-Orca/OpenOrca",
filename="3_5M-GPT3_5-Augmented.parquet",
repo_type="dataset"
)
# read the cached parquet file into a PyArrow table
orca = pq.read_table(path, schema=pa.schema([
pa.field("id", pa.string()),
pa.field("system_prompt", pa.string()),
pa.field("question", pa.large_string()),
pa.field("response", pa.large_string()),
]))
# augment the table with some additional columns
orca = orca.add_column(
orca.schema.get_field_index("question"),
"question_length",
pc.utf8_length(orca["question"])
)
orca = orca.add_column(
orca.schema.get_field_index("response"),
"response_length",
pc.utf8_length(orca["response"])
)
# shuffle the table to make it unique to the Xet storage
orca = shuffle_table(orca)
# limit the table to the first 100,000 rows
table = orca[:100_000]
# take a look at the first 3 rows of the table
table[:3].to_pandas()
id | system_prompt | question_length | question | response_length | response | |
---|---|---|---|---|---|---|
0 | cot.64099 | 你是一個幫助人們找到……的 AI 助手。 | 241 | 思考這個問題。幼發拉底河是…… | 1663 | 問題是問幼發拉底河…… |
1 | flan.1206442 | 你是一個 AI 助手。你會得到一個…… | 230 | 單選/多選題:是否可以…… | 751 | 無法斷定牛仔…… |
2 | t0.1170225 | 你是一個 AI 助手。使用者會給你…… | 1484 | 問:我正在參加考試,必須猜對…… | 128 | 這段話主要告訴我們哪些東西很重要…… |
將表格作為 Parquet 檔案上傳到 Hugging Face Hub
自從 pyarrow>=21.0.0 版本以來,我們可以在 pyarrow
函式中使用 Hugging Face URI,透過 hf://
URI 方案直接向 Hub 讀寫 parquet (及其他格式) 檔案。
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 96.1MB / 96.1MB, 48.0kB/s
Total Bytes: 96.1M
Total Transfer: 96.1M
我們可以看到表格已全部作為新資料上傳(總位元組數 == 總傳輸量),因為 Xet 儲存層尚不知道此資料。現在將其讀回為一個 pyarrow
表格。
downloaded_table = pq.read_table("hf://datasets/kszucs/pq/orca.parquet")
assert downloaded_table.equals(table)
請注意,所有接受檔案路徑的 pyarrow
函式也都接受 Hugging Face URI,例如 pyarrow datasets、CSV 函式、增量 Parquet 寫入器或只讀取 parquet 元資料。
pq.read_metadata("hf://datasets/kszucs/pq/orca.parquet")
<pyarrow._parquet.FileMetaData object at 0x16ebfa980>
created_by: parquet-cpp-arrow version 21.0.0-SNAPSHOT
num_columns: 6
num_rows: 100000
num_row_groups: 1
format_version: 2.6
serialized_size: 4143
Parquet 去重的不同用例
為了展示內容定義分塊功能的有效性,我們將嘗試它在以下情況下的表現:
- 重新上傳表格的精確副本
- 在表格中新增/刪除列
- 在表格中更改列型別
- 追加新行和連線表格
- 在表格中插入/刪除行
- 更改表格的行組大小
- 使用不同的檔案級拆分
1. 重新上傳完全相同的表格副本
雖然這個用例聽起來微不足道,但傳統檔案系統不會對檔案進行去重,導致資料的完全重新上傳和重新下載。相比之下,一個利用內容定義分塊的系統可以識別出文件內容是相同的,從而避免不必要的資料傳輸。
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-copy.parquet")
New Data Upload: | | 0.00B / 0.00B, 0.00B/s
Total Bytes: 96.1M
Total Transfer: 0.00
我們可以看到沒有新資料被上傳,操作是瞬時完成的。現在讓我們看看如果我們將相同的檔案再次上傳到不同的倉庫會發生什麼。
>>> pq.write_table(table, "hf://datasets/kszucs/pq-copy/orca-copy-again.parquet")
New Data Upload: | | 0.00B / 0.00B, 0.00B/s
Total Bytes: 96.1M
Total Transfer: 0.00
上傳再次瞬時完成,因為去重也適用於跨倉庫。這是 Xet 儲存層的一個關鍵特性,能夠實現高效的資料共享和協作。你可以在從分塊到資料塊:加速 Hub 上的上傳和下載這篇部落格文章中閱讀更多關於細節和擴充套件挑戰的內容。
2. 在表格中新增和刪除列
首先將原始表格和修改後的表格寫入本地 parquet 檔案,以檢視它們的大小。
table_with_new_columns = table.add_column(
table.schema.get_field_index("response"),
"response_short",
pc.utf8_slice_codeunits(table["response"], 0, 10)
)
table_with_removed_columns = table.drop(["response"])
pq.write_table(table, "/tmp/original.parquet")
pq.write_table(table_with_new_columns, "/tmp/with-new-columns.parquet")
pq.write_table(table_with_removed_columns, "/tmp/with-removed-columns.parquet")
!ls -lah /tmp/*.parquet
-rw-r--r-- 1 kszucs wheel 92M Jul 22 14:47 /tmp/original.parquet
-rw-r--r-- 1 kszucs wheel 92M Jul 22 14:47 /tmp/with-new-columns.parquet
-rw-r--r-- 1 kszucs wheel 67M Jul 22 14:47 /tmp/with-removed-columns.parquet
現在將它們上傳到 Hugging Face,看看實際傳輸了多少資料。
>>> pq.write_table(table_with_new_columns, "hf://datasets/kszucs/pq/orca-added-columns.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 575kB / 575kB, 288kB/s
Total Bytes: 96.6M
Total Transfer: 575k
我們可以看到,只有新增的列和位於檔案尾部的新 parquet 元資料被上傳,而原始資料沒有再次傳輸。這是 Xet 儲存層的一大優勢,因為它允許我們高效地新增新列而無需再次傳輸整個資料集。
同樣的情況也適用於刪除列,如下所示。
>>> pq.write_table(table_with_removed_columns, "hf://datasets/kszucs/pq/orca-removed-columns.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 37.7kB / 37.7kB, 27.0kB/s
Total Bytes: 70.6M
Total Transfer: 37.7k
為了更好地理解上傳了什麼,我們可以使用去重估算工具來視覺化兩個 parquet 檔案之間的差異。
from de import visualize
visualize(table, table_with_new_columns, title="With New Columns", prefix="orca")
新增新列
新增兩列新的資料意味著我們有未見過的資料頁需要傳輸(用紅色高亮顯示),但其餘資料保持不變(用綠色高亮顯示),因此不會再次傳輸。注意檔案尾部元資料中的小紅色區域,這在我們修改 parquet 檔案時幾乎總是會改變。去重統計顯示為 <去重後大小> / <總大小> = <去重率>
,其中比率越小意味著去重效能越高。
同樣視覺化刪除一列後的差異。
visualize(table, table_with_removed_columns, title="With Removed Columns", prefix="orca")
刪除列
由於我們正在刪除整個列,我們只能看到檔案尾部元資料的變化,所有其他列都保持不變並且已經存在於儲存層中,因此它們不會被再次傳輸。
3. 在表格中更改列型別
另一個常見的用例是更改表中的列型別,例如,為了減少儲存大小或為特定查詢最佳化資料。讓我們將 question_length
列從 int64
資料型別更改為 int32
,然後看看傳輸了多少資料。
# first make the table much smaller by removing the large string columns
# to highlight the differences better
table_without_text = table_with_new_columns.drop(["question", "response"])
# cast the question_length column to int64
table_with_casted_column = table_without_text.set_column(
table_without_text.schema.get_field_index("question_length"),
"question_length",
table_without_text["question_length"].cast("int32")
)
>>> pq.write_table(table_with_casted_column, "hf://datasets/kszucs/pq/orca-casted-column.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 181kB / 181kB, 113kB/s
Total Bytes: 1.80M
Total Transfer: 181k
同樣,我們可以看到只有新的列和更新後的 parquet 元資料被上傳。現在視覺化去重熱圖。
visualize(table_without_text, table_with_casted_column, title="With Casted Column", prefix="orca")
轉換型別後的列
第一個紅色區域表示新新增的列,而第二個紅色區域表示頁尾中更新的元資料。其餘資料保持不變,不會再次傳輸。
4. 追加新行和連線表格
我們將透過將原始資料集的另一個切片連線到表中來追加新行。
table = orca[:100_000]
next_10k_rows = orca[100_000:110_000]
table_with_appended_rows = pa.concat_tables([table, next_10k_rows])
assert len(table_with_appended_rows) == 110_000
現在檢查是否只有新行被上傳,因為原始資料已經被 Xet 儲存層所知。
>>> pq.write_table(table_with_appended_rows, "hf://datasets/kszucs/pq/orca-appended-rows.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 10.3MB / 10.3MB, 1.36MB/s
Total Bytes: 106M
Total Transfer: 10.3M
visualize(table, table_with_appended_rows, title="With Appended Rows", prefix="orca")
追加行
由於每一列都獲得了新資料,我們可以看到多個紅色區域。這是由於實際的 parquet 檔案規範,其中整個列是依次排列的(在每個行組內)。
5. 在表格中插入/刪除行
這裡是困難的部分,因為插入和刪除會移動現有的行,這會導致在 parquet 術語中稱為列塊或資料頁的不同。由於每個資料頁都是獨立壓縮的,即使是單行的插入或刪除也可能導致從編輯的行到 parquet 檔案末尾的位元組級表示完全不同。
這個 Parquet 特有的問題不能僅由 Xet 儲存層解決,parquet 檔案本身需要以一種即使有插入或刪除的行也能最小化資料頁差異的方式寫入。
讓我們嘗試使用現有的機制,看看它的表現如何。
table = orca[:100_000]
# remove 4k rows from two places
table_with_deleted_rows = pa.concat_tables([
orca[:15_000],
orca[18_000:60_000],
orca[61_000:100_000]
])
# add 1k rows at the first third of the table
table_with_inserted_rows = pa.concat_tables([
orca[:10_000],
orca[100_000:101_000],
orca[10_000:50_000],
orca[101_000:103_000],
orca[50_000:100_000],
])
assert len(table) == 100_000
assert len(table_with_deleted_rows) == 96_000
assert len(table_with_inserted_rows) == 103_000
>>> pq.write_table(table_with_inserted_rows, "hf://datasets/kszucs/pq/orca-inserted-rows.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 89.8MB / 89.8MB, 42.7kB/s
Total Bytes: 99.1M
Total Transfer: 89.8M
>>> pq.write_table(table_with_deleted_rows, "hf://datasets/kszucs/pq/orca-deleted-rows.parquet")
New Data Upload: 100%|███████████████████████████████████████████████| 78.2MB / 78.2MB, 46.5kB/s
Total Bytes: 92.2M
Total Transfer: 78.2M
同時視覺化兩種情況以檢視差異。
visualize(table, table_with_deleted_rows, title="Deleted Rows", prefix="orca")
visualize(table, table_with_inserted_rows, title="Inserted Rows", prefix="orca")
已刪除的行
已插入的行
我們可以看到去重效能顯著下降(比率更高),去重熱圖顯示壓縮後的 parquet 檔案彼此之間有很大差異。這是因為插入和刪除的行移動了現有的行,導致從編輯的行到 parquet 檔案末尾的資料頁不同。
我們可以透過寫入具有新的pyarrow 功能,稱為內容定義分塊 (CDC) 的 parquet 檔案來解決此問題。此功能確保列始終根據其內容被分塊到資料頁中,類似於 Xet 儲存層去重資料的方式,但在任何序列化或壓縮發生之前應用於列的邏輯值。
該功能可以透過將 use_content_defined_chunking=True
傳遞給 write_parquet
函式來啟用。
import pyarrow.parquet as pq
pq.write_table(table, "hf://user/repo/filename.parquet", use_content_defined_chunking=True)
Pandas 也支援這項新功能。
df.to_parquet("hf://user/repo/filename.parquet", use_content_defined_chunking=True)
讓我們視覺化使用 Parquet CDC 功能前後的去重差異。
visualize(table, table_with_deleted_rows, title="With Deleted Rows", prefix="orca", with_cdc=True)
visualize(table, table_with_inserted_rows, title="With Inserted Rows", prefix="orca", with_cdc=True)
已刪除的行
壓縮 | 原生 Parquet | CDC Parquet |
---|---|---|
無 | ![]() |
![]() |
去重統計 | 185.3 MB / 306.8 MB = 60% | 162.9 MB / 307.2 MB = 53% |
Snappy 壓縮 | ![]() |
![]() |
去重統計 | 174.4 MB / 188.3 MB = 92% | 104.3 MB / 188.8 MB = 55% |
已插入的行
壓縮 | 原生 Parquet | CDC Parquet |
---|---|---|
無 | ![]() |
![]() |
去重統計 | 190.1 MB / 318.0 MB = 59% | 164.1 MB / 318.4 MB = 51% |
Snappy 壓縮 | ![]() |
![]() |
去重統計 | 186.2 MB / 195.2 MB = 95% | 102.8 MB / 195.7 MB = 52% |
看起來好多了!俗話說,實踐是檢驗真理的唯一標準,讓我們實際使用內容定義分塊 parquet 功能上傳表格,看看傳輸了多少資料。
請注意,我們需要首先上傳啟用了內容定義分塊的原始表格。
>>> pq.write_table(table, "hf://datasets/kszucs/pq/orca-cdc.parquet", use_content_defined_chunking=True)
New Data Upload: 100%|███████████████████████████████████████████████| 94.5MB / 94.5MB, 46.5kB/s
Total Bytes: 96.4M
Total Transfer: 94.5M
>>> pq.write_table(
... table_with_inserted_rows,
... "hf://datasets/kszucs/pq/orca-inserted-rows-cdc.parquet",
... use_content_defined_chunking=True
... )
New Data Upload: 100%|███████████████████████████████████████████████| 6.00MB / 6.00MB, 1.00MB/s
Total Bytes: 99.3M
Total Transfer: 6.00M
>>> pq.write_table(
... table_with_deleted_rows,
... "hf://datasets/kszucs/pq/orca-deleted-rows-cdc.parquet",
... use_content_defined_chunking=True
... )
New Data Upload: 100%|███████████████████████████████████████████████| 7.57MB / 7.57MB, 1.35MB/s
Total Bytes: 92.4M
Total Transfer: 7.57M
上傳的資料明顯小於之前,顯示出更好的去重效能,如上面的熱圖所示。
需要注意的是,使用 huggingface_hub.hf_hub_download()
和 datasets.load_dataset()
函式下載時,同樣適用這些效能優勢。
6. 使用不同的行組大小
根據讀取器/寫入器的限制,在某些情況下,較大或較小的行組大小可能更有利。parquet 寫入器實現預設使用固定大小的行組,對於 pyarrow,預設值為 1 百萬行。資料集寫入器可能會減小行組大小以提高隨機訪問效能或減少讀取器應用程式的記憶體佔用。
更改行組大小將在行組之間移動行,從而在資料頁之間移動值,因此我們遇到了與插入或刪除行類似的問題。讓我們使用 parquet CDC 功能比較不同行組大小之間的去重效能。
from de import visualize
# pick a larger subset of the dataset to have enough rows for the row group size tests
table = orca[2_000_000:3_000_000]
visualize(table, (table, {"row_group_size": 128 * 1024}), title="Small Row Groups", with_cdc=True, prefix="orca")
visualize(table, (table, {"row_group_size": 256 * 1024}), title="Medium Row Groups", with_cdc=True, prefix="orca")
小行組
壓縮 | 原生 Parquet | CDC Parquet |
---|---|---|
無 | ![]() |
![]() |
去重統計 | 1.6 GB / 3.1 GB = 52% | 1.6 GB / 3.1 GB = 50% |
Snappy 壓縮 | ![]() |
![]() |
去重統計 | 1.1 GB / 1.9 GB = 59% | 995.0 MB / 1.9 GB = 51% |
中等行組
壓縮 | 原生 Parquet | CDC Parquet |
---|---|---|
無 | ![]() |
![]() |
去重統計 | 1.6 GB / 3.1 GB = 51% | 1.6 GB / 3.1 GB = 50% |
Snappy 壓縮 | ![]() |
![]() |
去重統計 | 1.1 GB / 1.9 GB = 57% | 976.5 MB / 1.9 GB = 50% |
7. 使用不同的檔案級拆分
資料集通常被分割成多個檔案以提高並行性和隨機訪問。Parquet CDC 結合 Xet 儲存層可以有效地對多個檔案中的資料進行去重,即使資料是在不同的邊界處分割的。
讓我們用三種不同的檔案級拆分方式寫出資料集,然後比較去重效能。
from pathlib import Path
from de import estimate
def write_dataset(table, base_dir, num_shards, **kwargs):
"""Simple utility to write a pyarrow table to multiple Parquet files."""
# ensure that directory exists
base_dir = Path(base_dir)
base_dir.mkdir(parents=True, exist_ok=True)
# split and write the table into multiple files
rows_per_file = len(table) / num_shards
for i in range(num_shards):
start = i * rows_per_file
end = min((i + 1) * rows_per_file, len(table))
shard = table.slice(start, end - start)
path = base_dir / f"part-{i}.parquet"
pq.write_table(shard, path, **kwargs)
write_dataset(orca, "orca5-cdc", num_shards=5, use_content_defined_chunking=True)
write_dataset(orca, "orca10-cdc", num_shards=10, use_content_defined_chunking=True)
write_dataset(orca, "orca20-cdc", num_shards=20, use_content_defined_chunking=True)
estimate("orca5-cdc/*.parquet", "orca10-cdc/*.parquet", "orca20-cdc/*.parquet")
Total size: 9.3 GB
Chunk size: 3.2 GB
儘管我們用三種不同的分片配置上傳了資料集,但總上傳大小將僅略大於原始資料集大小。
使用 Pandas 的 Parquet CDC 功能
到目前為止,我們一直使用 PyArrow,現在讓我們透過下載、篩選然後上傳啟用了內容定義分塊功能的資料集,來探索如何將相同的 CDC 功能與 Pandas 結合使用。
import pandas as pd
src = "hf://datasets/teknium/OpenHermes-2.5/openhermes2_5.json"
df = pd.read_json(src)
>>> dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc.parquet"
>>> df.to_parquet(dst, use_content_defined_chunking=True)
New Data Upload: 100%|███████████████████████████████████████████████| 799MB / 799MB, 197kB/s
Total Bytes: 799M
Total Transfer: 799M
>>> short_df = df[[len(c) < 10 for c in df.conversations]]
>>> short_dst = "hf://datasets/kszucs/pq/hermes-2.5-cdc-short.parquet"
>>> short_df.to_parquet(short_dst, use_content_defined_chunking=True)
New Data Upload: 100%|███████████████████████████████████████████████| 21.9MB / 21.9MB, 45.4kB/s
Total Bytes: 801M
Total Transfer: 21.9M
import pyarrow as pa
from de import visualize
visualize(
pa.Table.from_pandas(df),
pa.Table.from_pandas(short_df),
title="Hermes 2.5 Short Conversations",
with_cdc=True,
prefix="hermes"
)
Hermes 2.5 短對話
壓縮 | 原生 Parquet | CDC Parquet |
---|---|---|
無 | ![]() |
![]() |
去重統計 | 1.9 GB / 3.2 GB = 58% | 1.6 GB / 3.2 GB = 51% |
Snappy 壓縮 | ![]() |
![]() |
去重統計 | 1.5 GB / 1.6 GB = 94% | 821.1 MB / 1.6 GB = 51% |
由於 Parquet CDC 應用於 parquet 資料頁級別(列塊級別),去重效能取決於篩選器的選擇性,或者更確切地說,是變化在整個資料集中的分佈。如果大部分資料頁受到影響,那麼去重率將顯著下降。
參考文獻
關於此功能的更多詳情可以在以下連結找到:
結論
我們探討了新的 Parquet 內容定義分塊功能與 Hugging Face 的 Xet 儲存層結合使用的效能優勢。我們展示了它如何在各種場景中高效地進行資料去重,使 parquet 操作更快、更節省儲存空間。與傳統的雲端儲存解決方案相比,Xet 儲存層與 Parquet CDC 可以顯著減少資料傳輸時間和成本。
在這裡將您的 Hugging Face 倉庫從 Git LFS 遷移到 Xet 以享受此優勢:https://huggingface.co/join/xet