Datasets 文件
流式傳輸
並獲得增強的文件體驗
開始使用
流式傳輸
資料集流式傳輸允許您在不下載資料集的情況下處理資料集。資料在您迭代資料集時進行流式傳輸。這在以下情況下特別有用:
- 您不想等待超大型資料集下載完成。
- 資料集大小超出計算機上的可用磁碟空間。
- 您只想快速瀏覽資料集中的少量樣本。


例如,HuggingFaceFW/fineweb 資料集的英文拆分是 45 TB,但您可以透過流式傳輸立即使用它。在 `load_dataset()` 中設定 `streaming=True` 來流式傳輸資料集,如下所示
>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> print(next(iter(dataset)))
{'text': "How AP reported in all formats from tornado-stricken regionsMarch 8, 2012\nWhen the first serious bout of tornadoes of 2012 blew through middle America in the middle of the night, they touched down in places hours from any AP bureau...
資料集流式傳輸還允許您處理由本地檔案組成的資料集,而無需進行任何轉換。在這種情況下,資料在您迭代資料集時從本地檔案流式傳輸。這在以下情況下特別有用:
- 您不想等待超大型本地資料集轉換為 Arrow。
- 轉換後的檔案大小會超出計算機上的可用磁碟空間。
- 您只想快速瀏覽資料集中的少量樣本。
例如,您可以流式傳輸數百個壓縮 JSONL 檔案的本地資料集,例如 oscar-corpus/OSCAR-2201,以便立即使用它
>>> from datasets import load_dataset
>>> data_files = {'train': 'path/to/OSCAR-2201/compressed/en_meta/*.jsonl.gz'}
>>> dataset = load_dataset('json', data_files=data_files, split='train', streaming=True)
>>> print(next(iter(dataset)))
{'id': 0, 'text': 'Founded in 2015, Golden Bees is a leading programmatic recruitment platform dedicated to employers, HR agencies and job boards. The company has developed unique HR-custom technologies and predictive algorithms to identify and attract the best candidates for a job opportunity.', ...
以流式模式載入資料集會建立一個新的資料集型別例項(而不是經典的 `Dataset` 物件),稱為 `IterableDataset`。這種特殊型別的資料集有自己的一套處理方法,如下所示。
`IterableDataset` 對於像訓練模型這樣的迭代作業很有用。您不應該將 `IterableDataset` 用於需要隨機訪問示例的作業,因為您必須使用 for 迴圈遍歷所有示例。獲取可迭代資料集中的最後一個示例需要您遍歷所有先前的示例。您可以在Dataset 與 IterableDataset 指南中找到更多詳細資訊。
列索引
有時方便地迭代特定列的值。幸運的是,`IterableDataset` 支援列索引
>>> from datasets import load_dataset
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> print(next(iter(dataset["text"])))
Beginners BBQ Class Taking Place in Missoula!...
從資料集轉換
如果您有一個現有的 `Dataset` 物件,您可以使用 `to_iterable_dataset()` 函式將其轉換為 `IterableDataset`。這實際上比在 `load_dataset()` 中設定 `streaming=True` 引數更快,因為資料是從本地檔案流式傳輸的。
>>> from datasets import load_dataset
# faster 🐇
>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset()
# slower 🐢
>>> iterable_dataset = load_dataset("ethz/food101", streaming=True)
當 `IterableDataset` 例項化時,`to_iterable_dataset()` 函式支援分片。當處理大型資料集時,這非常有用,您可能希望打亂資料集或使用 PyTorch DataLoader 啟用快速並行載入。
>>> import torch
>>> from datasets import load_dataset
>>> dataset = load_dataset("ethz/food101")
>>> iterable_dataset = dataset.to_iterable_dataset(num_shards=64) # shard the dataset
>>> iterable_dataset = iterable_dataset.shuffle(buffer_size=10_000) # shuffles the shards order and use a shuffle buffer when you start iterating
dataloader = torch.utils.data.DataLoader(iterable_dataset, num_workers=4) # assigns 64 / 4 = 16 shards from the shuffled list of shards to each worker when you start iterating
打亂
與常規的 `Dataset` 物件一樣,您也可以使用 `IterableDataset.shuffle()` 來打亂 `IterableDataset`。
`buffer_size` 引數控制用於隨機取樣示例的緩衝區大小。假設您的資料集有一百萬個示例,並且您將 `buffer_size` 設定為一萬。 `IterableDataset.shuffle()` 將從緩衝區中的前一萬個示例中隨機選擇示例。緩衝區中選定的示例將被新示例替換。預設情況下,緩衝區大小為 1,000。
>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> shuffled_dataset = dataset.shuffle(seed=42, buffer_size=10_000)
如果資料集分片為多個檔案,`IterableDataset.shuffle()` 還會打亂分片的順序。
重新洗牌
有時您可能希望在每個 epoch 之後重新打亂資料集。這將要求您為每個 epoch 設定不同的種子。在 epoch 之間使用 `IterableDataset.set_epoch()` 來告訴資料集您當前在哪個 epoch。
您的種子實際上變為:`初始種子 + 當前 epoch`。
>>> for epoch in range(epochs):
... shuffled_dataset.set_epoch(epoch)
... for example in shuffled_dataset:
... ...
拆分資料集
您可以透過兩種方式拆分資料集
- `IterableDataset.take()` 返回資料集中的前 `n` 個示例
>>> dataset = load_dataset('HuggingFaceFW/fineweb', split='train', streaming=True)
>>> dataset_head = dataset.take(2)
>>> list(dataset_head)
[{'text': "How AP reported in all formats from tor...},
{'text': 'Did you know you have two little yellow...}]
- `IterableDataset.skip()` 省略資料集中的前 `n` 個示例並返回剩餘的示例
>>> train_dataset = shuffled_dataset.skip(1000)
`take` 和 `skip` 會阻止將來對 `shuffle` 的呼叫,因為它們會鎖定分片的順序。您應該在拆分資料集之前對其進行 `shuffle`。
分片
🤗 Datasets 支援分片,將非常大的資料集劃分為預定義數量的塊。在 `shard()` 中指定 `num_shards` 引數以確定將資料集拆分為多少個分片。您還需要使用 `index` 引數提供您要返回的分片。
例如,amazon_polarity 資料集有 4 個分片(在這種情況下它們是 4 個 Parquet 檔案)
>>> from datasets import load_dataset
>>> dataset = load_dataset("amazon_polarity", split="train", streaming=True)
>>> print(dataset)
IterableDataset({
features: ['label', 'title', 'content'],
num_shards: 4
})
將資料集分片成兩個塊後,第一個塊將只有 2 個分片
>>> dataset.shard(num_shards=2, index=0)
IterableDataset({
features: ['label', 'title', 'content'],
num_shards: 2
})
如果您的資料集有 `dataset.num_shards==1`,您應該使用 `IterableDataset.skip()` 和 `IterableDataset.take()` 來分塊。
交錯
`interleave_datasets()` 可以將 `IterableDataset` 與其他資料集結合。組合後的資料集會從每個原始資料集中返回交替的示例。
>>> from datasets import interleave_datasets
>>> es_dataset = load_dataset('allenai/c4', 'es', split='train', streaming=True)
>>> fr_dataset = load_dataset('allenai/c4', 'fr', split='train', streaming=True)
>>> multilingual_dataset = interleave_datasets([es_dataset, fr_dataset])
>>> list(multilingual_dataset.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
{'text': 'Le sacre de philippe ier, 23 mai 1059 - Compte Rendu...'}]
定義每個原始資料集的取樣機率,以更好地控制它們的取樣和組合方式。使用 `probabilities` 引數設定您所需的取樣機率
>>> multilingual_dataset_with_oversampling = interleave_datasets([es_dataset, fr_dataset], probabilities=[0.8, 0.2], seed=42)
>>> list(multilingual_dataset_with_oversampling.take(2))
[{'text': 'Comprar Zapatillas para niña en chancla con goma por...'},
{'text': 'Chevrolet Cavalier Usados en Bogota - Carros en Vent...'}]
最終資料集約 80% 由 `es_dataset` 組成,20% 由 `fr_dataset` 組成。
您還可以指定 `stopping_strategy`。預設策略 `first_exhausted` 是一種子取樣策略,即一旦其中一個數據集的樣本用盡,資料集構建就會停止。您可以指定 `stopping_strategy=all_exhausted` 來執行過取樣策略。在這種情況下,一旦每個資料集中的所有樣本都至少添加了一次,資料集構建就會停止。實際上,這意味著如果一個數據集用盡,它將返回到該資料集的開頭,直到達到停止條件。請注意,如果未指定取樣機率,則新資料集將包含 `max_length_datasets*nb_dataset` 個樣本。
重新命名、刪除和轉換
以下方法允許您修改資料集的列。這些方法對於重新命名或刪除列以及將列更改為一組新的特徵很有用。
重新命名
當您需要重新命名資料集中的列時,請使用 `IterableDataset.rename_column()`。與原始列關聯的特徵實際上會移動到新的列名下,而不是僅僅替換原始列。
向 `IterableDataset.rename_column()` 提供原始列名和新列名
>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.rename_column("text", "content")
刪除
當您需要刪除一個或多個列時,向 `IterableDataset.remove_columns()` 提供要刪除的列名。透過提供列名列表來刪除多個列
>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> dataset = dataset.remove_columns('timestamp')
型別轉換
`IterableDataset.cast()` 更改一個或多個列的特徵型別。此方法將您的新 `Features` 作為其引數。以下示例程式碼顯示瞭如何更改 `ClassLabel` 和 `Value` 的特徵型別
>>> from datasets import load_dataset
>>> dataset = load_dataset('nyu-mll/glue', 'mrpc', split='train', streaming=True)
>>> dataset.features
{'sentence1': Value('string'),
'sentence2': Value('string'),
'label': ClassLabel(names=['not_equivalent', 'equivalent']),
'idx': Value('int32')}
>>> from datasets import ClassLabel, Value
>>> new_features = dataset.features.copy()
>>> new_features["label"] = ClassLabel(names=['negative', 'positive'])
>>> new_features["idx"] = Value('int64')
>>> dataset = dataset.cast(new_features)
>>> dataset.features
{'sentence1': Value('string'),
'sentence2': Value('string'),
'label': ClassLabel(names=['negative', 'positive']),
'idx': Value('int64')}
型別轉換僅在原始特徵型別和新特徵型別相容時才有效。例如,如果原始列只包含 1 和 0,您可以將特徵型別為 `Value('int32')` 的列轉換為 `Value('bool')`。
使用 `IterableDataset.cast_column()` 更改單個列的特徵型別。將列名及其新特徵型別作為引數傳遞
>>> dataset.features
{'audio': Audio(sampling_rate=44100, mono=True)}
>>> dataset = dataset.cast_column("audio", Audio(sampling_rate=16000))
>>> dataset.features
{'audio': Audio(sampling_rate=16000, mono=True)}
對映
類似於常規 `Dataset` 的 `Dataset.map()` 函式,🤗 Datasets 提供了 `IterableDataset.map()` 來處理 `IterableDataset`。`IterableDataset.map()` 在示例流式傳輸時即時應用處理。
它允許您將處理函式應用於資料集中的每個示例,無論是獨立應用還是批次應用。此函式甚至可以建立新的行和列。
以下示例演示瞭如何對 `IterableDataset` 進行分詞。該函式需要接受並輸出一個 `dict`
>>> def add_prefix(example):
... example['text'] = 'My text: ' + example['text']
... return example
接下來,使用 `IterableDataset.map()` 將此函式應用於資料集
>>> from datasets import load_dataset
>>> dataset = load_dataset('allenai/c4', 'en', streaming=True, split='train')
>>> updated_dataset = dataset.map(add_prefix)
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...',
'timestamp': '2019-04-25 12:57:54',
'url': 'https://klyq.com/beginners-bbq-class-taking-place-in-missoula/'},
{'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...',
'timestamp': '2019-04-21 10:07:13',
'url': 'https://forums.macrumors.com/threads/restore-from-larger-disk-to-smaller-disk.1311329/'},
{'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...',
'timestamp': '2019-04-25 10:40:23',
'url': 'https://awishcometrue.com/Catalogs/Clearance/Tweens/V1960-Find-A-Way'}]
讓我們看看另一個例子,這次您將使用 `IterableDataset.map()` 刪除列。當您刪除列時,它只在示例提供給對映函式後才刪除。這允許對映函式在使用列內容後才刪除它們。
在 `IterableDataset.map()` 中使用 `remove_columns` 引數指定要刪除的列
>>> updated_dataset = dataset.map(add_prefix, remove_columns=["timestamp", "url"])
>>> list(updated_dataset.take(3))
[{'text': 'My text: Beginners BBQ Class Taking Place in Missoula!\nDo you want to get better at making...'},
{'text': 'My text: Discussion in \'Mac OS X Lion (10.7)\' started by axboi87, Jan 20, 2012.\nI\'ve go...'},
{'text': 'My text: Foil plaid lycra and spandex shortall with metallic slinky insets. Attached metall...'}]
批次處理
`IterableDataset.map()` 也支援處理批次示例。透過設定 `batched=True` 來批次操作。預設批次大小為 1000,但您可以透過 `batch_size` 引數進行調整。這為許多有趣的應用程式打開了大門,例如分詞、將長句拆分為較短的塊以及資料增強。
分詞
>>> from datasets import load_dataset
>>> from transformers import AutoTokenizer
>>> dataset = load_dataset("allenai/c4", "en", streaming=True, split="train")
>>> tokenizer = AutoTokenizer.from_pretrained('distilbert-base-uncased')
>>> def encode(examples):
... return tokenizer(examples['text'], truncation=True, padding='max_length')
>>> dataset = dataset.map(encode, batched=True, remove_columns=["text", "timestamp", "url"])
>>> next(iter(dataset))
{'input_ids': [101, 4088, 16912, 22861, 4160, 2465, 2635, 2173, 1999, 3335, ..., 0, 0, 0],
'attention_mask': [1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, ..., 0, 0]}
請參閱 批次對映處理 文件中的其他批次處理示例。它們對可迭代資料集的工作方式相同。
篩選
您可以使用 `Dataset.filter()` 根據謂詞函式過濾資料集中的行。它返回符合指定條件的行
>>> from datasets import load_dataset
>>> dataset = load_dataset('HuggingFaceFW/fineweb', streaming=True, split='train')
>>> start_with_ar = dataset.filter(lambda example: example['text'].startswith('San Francisco'))
>>> next(iter(start_with_ar))
{'text': 'San Francisco 49ers cornerback Shawntae Spencer will miss the rest of the sea...}
如果您設定 `with_indices=True`,`Dataset.filter()` 也可以透過索引進行過濾
>>> even_dataset = dataset.filter(lambda example, idx: idx % 2 == 0, with_indices=True)
>>> list(even_dataset.take(3))
[{'text': 'How AP reported in all formats from tornado-stricken regionsMarch 8, 2012 Whe...},
{'text': 'Car Wash For Clara! Now is your chance to help! 2 year old Clara Woodward has...},
{'text': 'Log In Please enter your ECode to log in. Forgotten your eCode? If you create...}]
批次
`batch` 方法將您的 `IterableDataset` 轉換為一個批次迭代器。這在您希望在訓練迴圈中使用批次或在使用需要批次輸入的框架時特別有用。
在使用 `map` 函式對資料批次應用函式時,也有一個“批次處理”選項,這在上面的對映部分中討論過。此處描述的 `batch` 方法不同,它提供了從資料集建立批次的更直接方式。
您可以這樣使用 `batch` 方法
from datasets import load_dataset
# Load a dataset in streaming mode
dataset = load_dataset("some_dataset", split="train", streaming=True)
# Create batches of 32 samples
batched_dataset = dataset.batch(batch_size=32)
# Iterate over the batched dataset
for batch in batched_dataset:
print(batch)
break
在此示例中,`batched_dataset` 仍然是一個 `IterableDataset`,但現在生成的每個專案都是 32 個樣本的批次,而不是單個樣本。這種批處理在您遍歷資料集時即時完成,保留了 `IterableDataset` 的記憶體高效特性。
`batch` 方法還提供了一個 `drop_last_batch` 引數。當設定為 `True` 時,如果最後一個批次小於指定的 `batch_size`,它將丟棄最後一個批次。這在您的下游處理要求所有批次大小相同的情況下很有用
batched_dataset = dataset.batch(batch_size=32, drop_last_batch=True)
在訓練迴圈中流式傳輸
`IterableDataset` 可以整合到訓練迴圈中。首先,打亂資料集
>>> seed, buffer_size = 42, 10_000
>>> dataset = dataset.shuffle(seed, buffer_size=buffer_size)
最後,建立一個簡單的訓練迴圈並開始訓練
>>> import torch
>>> from torch.utils.data import DataLoader
>>> from transformers import AutoModelForMaskedLM, DataCollatorForLanguageModeling
>>> from tqdm import tqdm
>>> dataset = dataset.with_format("torch")
>>> dataloader = DataLoader(dataset, collate_fn=DataCollatorForLanguageModeling(tokenizer))
>>> device = 'cuda' if torch.cuda.is_available() else 'cpu'
>>> model = AutoModelForMaskedLM.from_pretrained("distilbert-base-uncased")
>>> model.train().to(device)
>>> optimizer = torch.optim.AdamW(params=model.parameters(), lr=1e-5)
>>> for epoch in range(3):
... dataset.set_epoch(epoch)
... for i, batch in enumerate(tqdm(dataloader, total=5)):
... if i == 5:
... break
... batch = {k: v.to(device) for k, v in batch.items()}
... outputs = model(**batch)
... loss = outputs[0]
... loss.backward()
... optimizer.step()
... optimizer.zero_grad()
... if i % 10 == 0:
... print(f"loss: {loss}")
儲存資料集檢查點並恢復迭代
如果您的訓練迴圈停止,您可能希望從停止的地方重新開始訓練。為此,您可以儲存模型和最佳化器的檢查點,以及資料載入器。
可迭代資料集不提供對特定示例索引的隨機訪問以從中恢復,但您可以使用 `IterableDataset.state_dict()` 和 `IterableDataset.load_state_dict()` 從檢查點恢復,類似於您對模型和最佳化器所做的操作
>>> iterable_dataset = Dataset.from_dict({"a": range(6)}).to_iterable_dataset(num_shards=3)
>>> for idx, example in enumerate(iterable_dataset):
... print(example)
... if idx == 2:
... state_dict = iterable_dataset.state_dict()
... print("checkpoint")
... break
>>> iterable_dataset.load_state_dict(state_dict)
>>> print(f"restart from checkpoint")
>>> for example in iterable_dataset:
... print(example)
返回
{'a': 0}
{'a': 1}
{'a': 2}
checkpoint
restart from checkpoint
{'a': 3}
{'a': 4}
{'a': 5}
在底層,可迭代資料集跟蹤當前正在讀取的分片以及當前分片中的示例索引,並將其資訊儲存在 `state_dict` 中。
要從檢查點恢復,資料集會跳過所有先前讀取的分片,從當前分片重新開始。然後它讀取分片並跳過示例,直到它到達檢查點中的確切示例。
因此,重新啟動資料集非常快,因為它不會重新讀取已迭代的分片。儘管如此,恢復資料集通常不是即時的,因為它必須從當前分片的開頭重新讀取並跳過示例,直到到達檢查點位置。
這可以與 `torchdata` 中的 `StatefulDataLoader` 一起使用
>>> from torchdata.stateful_dataloader import StatefulDataLoader
>>> iterable_dataset = load_dataset("deepmind/code_contests", streaming=True, split="train")
>>> dataloader = StatefulDataLoader(iterable_dataset, batch_size=32, num_workers=4)
>>> # checkpoint
>>> state_dict = dataloader.state_dict() # uses iterable_dataset.state_dict() under the hood
>>> # resume from checkpoint
>>> dataloader.load_state_dict(state_dict) # uses iterable_dataset.load_state_dict() under the hood
恢復會準確返回檢查點儲存的位置,除非使用了 `shuffle()`:恢復時會丟失 shuffle 緩衝區中的示例,並且緩衝區會重新填充新資料。
儲存
一旦您的可迭代資料集準備就緒,您可以將其儲存為 Parquet 格式的 Hugging Face 資料集,並稍後透過 `load_dataset()` 重用。
透過提供您希望將其儲存到的 Hugging Face 資料集倉庫的名稱給 `push_to_hub()` 來儲存您的資料集。這會迭代資料集並逐步將資料上傳到 Hugging Face
dataset.push_to_hub("username/my_dataset")
如果資料集包含多個分片(`dataset.num_shards > 1`),您可以使用多個程序並行上傳。如果您應用了 `map()` 或 `filter()` 步驟,這將特別有用,因為它們將更快地並行執行
dataset.push_to_hub("username/my_dataset", num_proc=8)
使用 `load_dataset()` 函式重新載入資料集
from datasets import load_dataset
reloaded_dataset = load_dataset("username/my_dataset")
匯出
🤗 Datasets 也支援匯出,因此您可以在其他應用程式中使用資料集。下表顯示了當前支援匯出的檔案格式
檔案型別 | 匯出方法 |
---|---|
CSV | IterableDataset.to_csv() |
JSON | IterableDataset.to_json() |
Parquet | IterableDataset.to_parquet() |
SQL | IterableDataset.to_sql() |
記憶體中的 Python 物件 | `IterableDataset.to_pandas()`、`IterableDataset.to_polars()` 或 `IterableDataset.to_dict()` |
例如,將資料集匯出到 CSV 檔案,如下所示
>>> dataset.to_csv("path/of/my/dataset.csv")
如果您的資料集很大,您可以按分片儲存一個檔案,例如
>>> num_shards = dataset.num_shards
>>> for index in range(num_shards):
... shard = dataset.shard(index, num_shards)
... shard.to_parquet(f"path/of/my/dataset/data-{index:05d}.parquet")