使用 🤗 Transformers 進行機率時間序列預測

釋出於 2022 年 12 月 1 日
在 GitHub 上更新
Open In Colab

簡介

時間序列預測是一個重要的科學和商業問題,因此最近也出現了很多創新,除了經典方法外,還使用了基於深度學習的模型。像 ARIMA 這樣的經典方法和新穎的深度學習方法之間有一個重要的區別。

機率預測

通常,經典方法是針對資料集中的每個時間序列單獨進行擬合的。這些方法通常被稱為“單一”或“區域性”方法。然而,當某些應用需要處理大量時間序列時,訓練一個對所有可用時間序列都適用的“全域性”模型會更有益,這能使模型從許多不同來源中學習潛在表示。

一些經典方法是點值預測(也就是說,它們只為每個時間步輸出一個單一值),模型透過最小化相對於真實資料的 L2 或 L1 型別的損失來進行訓練。然而,由於預測通常用於某些現實世界的決策流程中,即使有人工參與,提供預測的不確定性也更有益。這也被稱為“機率預測”,與“點預測”相對。這需要對一個機率分佈進行建模,然後可以從中進行取樣。

簡而言之,我們希望訓練的是**全域性機率**模型,而不是區域性點預測模型。深度學習非常適合這個任務,因為神經網路可以從多個相關的時間序列中學習表示,並對資料的不確定性進行建模。

在機率設定中,通常會學習某個選定的引數分佈(如高斯分佈或學生 t 分佈)的未來引數,或者學習條件分位數函式,或者使用適用於時間序列設定的保形預測框架。方法的選擇不影響建模方面,因此通常可以被視為另一個超引數。人們總是可以透過取經驗均值或中位數將機率模型轉換為點預測模型。

Time Series Transformer

在建模具有序列性質的時間序列資料方面,可以想象,研究人員已經提出了使用迴圈神經網路(RNN)如 LSTM 或 GRU、卷積網路(CNN)的模型,以及最近基於 Transformer 的方法,這些方法很自然地適用於時間序列預測的設定。

在這篇部落格文章中,我們將利用原生的 Transformer 模型 (Vaswani et al., 2017)來完成**單變數**機率預測任務(即,單獨預測每個時間序列的一維分佈)。編碼器-解碼器 Transformer 是預測任務的自然選擇,因為它很好地融入了多種歸納偏置。

首先,在推理時,使用編碼器-解碼器架構非常有幫助,因為我們通常希望根據一些已記錄的資料來預測未來的幾個步驟。這可以被看作是類似於文字生成任務,在給定一些上下文的情況下,我們取樣下一個詞元並將其傳回解碼器(也稱為“自迴歸生成”)。類似地,在這裡我們也可以根據給定的分佈型別進行取樣,以提供直到我們期望的預測範圍的預測。這被稱為祖先採樣(Ancestral Sampling)。這裡有一篇關於語言模型上下文中取樣的優秀部落格文章。

其次,Transformer 幫助我們訓練可能包含數千個時間點的時間序列資料。由於注意力機制的時間和記憶體限制,一次性將時間序列的*所有*歷史輸入到模型中可能不可行。因此,可以考慮一個合適的上下文視窗,並在構建用於隨機梯度下降(SGD)的批次時,從訓練資料中取樣這個視窗以及隨後的預測長度大小的視窗。上下文大小的視窗可以傳遞給編碼器,預測視窗可以傳遞給一個帶有*因果掩碼*的解碼器。這意味著解碼器在學習下一個值時只能看到之前的時間步。這等同於訓練一個用於機器翻譯的原生 Transformer 的方式,被稱為“教師強制”(teacher forcing)。

與其它架構相比,Transformer 的另一個好處是我們可以將缺失值(在時間序列設定中很常見)作為額外的掩碼加入到編碼器或解碼器中,並且仍然可以在不依賴填充或插值的情況下進行訓練。這等同於像 BERT 和 GPT-2 這樣的模型在 Transformers 庫中的 `attention_mask`,用於在計算注意力矩陣時不包括填充詞元。

Transformer 架構的一個缺點是上下文和預測視窗大小受限於原生 Transformer 的二次計算和記憶體需求,參見 Tay et al., 2020。此外,由於 Transformer 是一個強大的架構,它可能比其他方法更容易過擬合或學習到虛假的關聯。

🤗 Transformers 庫帶有一個原生的機率時間序列 Transformer 模型,簡稱為 Time Series Transformer。在下面的章節中,我們將展示如何在一個自定義資料集上訓練這樣一個模型。

設定環境

首先,我們來安裝必要的庫:🤗 Transformers、🤗 Datasets、🤗 Evaluate、🤗 Accelerate 和 GluonTS

正如我們將要展示的,GluonTS 將用於轉換資料以建立特徵,以及建立適當的訓練、驗證和測試批次。

!pip install -q transformers

!pip install -q datasets

!pip install -q evaluate

!pip install -q accelerate

!pip install -q gluonts ujson

載入資料集

在這篇部落格文章中,我們將使用 `tourism_monthly` 資料集,該資料集可在 Hugging Face Hub 上找到。這個資料集包含了澳大利亞 366 個地區每月的旅遊量。

該資料集是 Monash 時間序列預測 儲存庫的一部分,該儲存庫收集了來自多個領域的時間序列資料集。它可以被視為時間序列預測的 GLUE 基準。

from datasets import load_dataset

dataset = load_dataset("monash_tsf", "tourism_monthly")

可以看出,資料集包含 3 個劃分:訓練集、驗證集和測試集。

dataset

>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 366
        })
    })

每個樣本都包含幾個鍵,其中 `start` 和 `target` 是最重要的。讓我們看看資料集中的第一個時間序列。

train_example = dataset['train'][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

`start` 僅表示時間序列的開始時間(以日期時間格式),而 `target` 包含時間序列的實際值。

`start` 將有助於為時間序列值新增與時間相關的特徵,作為模型的額外輸入(例如“一年中的月份”)。由於我們知道資料的頻率是 `monthly`(每月),我們知道例如第二個值的時間戳是 `1979-02-01`,等等。

print(train_example['start'])
print(train_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5772.876953125]

驗證集包含與訓練集相同的資料,只是時間上延長了一個 `prediction_length` 的長度。這使我們能夠將模型的預測與真實值進行驗證。

測試集的資料比驗證集又長一個 `prediction_length`(或者比訓練集長若干個 `prediction_length`,用於在多個滾動視窗上進行測試)。

validation_example = dataset['validation'][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

初始值與相應的訓練樣本完全相同。

print(validation_example['start'])
print(validation_example['target'])

>>> 1979-01-01 00:00:00
    [1149.8699951171875, 1053.8001708984375, ..., 5985.830078125]

然而,這個樣本比訓練樣本多了 `prediction_length=24` 個額外的值。我們來驗證一下。

freq = "1M"
prediction_length = 24

assert len(train_example["target"]) + prediction_length == len(
    validation_example["target"]
)

我們來視覺化一下。

import matplotlib.pyplot as plt

figure, axes = plt.subplots()
axes.plot(train_example["target"], color="blue")
axes.plot(validation_example["target"], color="red", alpha=0.5)

plt.show()

png

讓我們拆分資料

train_dataset = dataset["train"]
test_dataset = dataset["test"]

更新 `start` 為 `pd.Period`

我們要做的第一件事是使用資料的 `freq` 將每個時間序列的 `start` 特徵轉換為 pandas 的 `Period` 索引。

from functools import lru_cache

import pandas as pd
import numpy as np

@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)

def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

我們現在使用 `datasets` 的 `set_transform` 功能來即時地、原地完成這個操作。

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

定義模型

接下來,讓我們例項化一個模型。這個模型將從頭開始訓練,因此我們不會使用 `from_pretrained` 方法,而是從一個 `config` 檔案中隨機初始化模型。

我們為模型指定了幾個額外的引數。

  • `prediction_length`(在我們的例子中是 `24` 個月):這是 Transformer 的解碼器將要學習預測的時間範圍;
  • `context_length`:如果沒有指定 `context_length`,模型會將 `context_length`(編碼器的輸入)設定為等於 `prediction_length`;
  • 給定頻率的 `lags`:這些指定了我們要“回看”多遠,作為額外的特徵。例如,對於 `Daily`(每日)頻率,我們可能會考慮回看 `[1, 2, 7, 30, ...]` 天,換句話說,就是回看 1 天、2 天……;而對於 `Minute`(分鐘)資料,我們可能會考慮 `[1, 30, 60, 60*24, ...]` 等等;
  • 時間特徵的數量:在我們的例子中,這將是 `2`,因為我們將新增 `MonthOfYear`(年中的月份)和 `Age`(年齡)特徵;
  • 靜態類別特徵的數量:在我們的例子中,這將只是 `1`,因為我們將新增一個單一的“時間序列ID”特徵;
  • 基數:每個靜態類別特徵的值的數量,以列表形式表示。在我們的例子中,這將是 `[366]`,因為我們有 366 個不同的時間序列。
  • 嵌入維度:每個靜態類別特徵的嵌入維度,以列表形式表示。例如,`[3]` 意味著模型將為 `366` 個時間序列(地區)中的每一個學習一個大小為 `3` 的嵌入向量。

讓我們使用 GluonTS 為給定頻率(“monthly”)提供的預設滯後值。

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 11, 12, 13, 23, 24, 25, 35, 36, 37]

這意味著我們將在每個時間步回看最多 37 個月,作為額外的特徵。

我們再看看 GluonTS 提供給我們的預設時間特徵。

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function month_of_year at 0x7fa496d0ca70>]

在這種情況下,只有一個特徵,即“一年中的月份”。這意味著對於每個時間步,我們會將月份作為一個標量值新增進去(例如,如果時間戳是“一月”,值為 `1`;如果是“二月”,值為 `2`,以此類推)。

現在我們擁有定義模型所需的一切。

from transformers import TimeSeriesTransformerConfig, TimeSeriesTransformerForPrediction

config = TimeSeriesTransformerConfig(
    prediction_length=prediction_length,
    # context length:
    context_length=prediction_length * 2,
    # lags coming from helper given the freq:
    lags_sequence=lags_sequence,
    # we'll add 2 time features ("month of year" and "age", see further):
    num_time_features=len(time_features) + 1,
    # we have a single static categorical feature, namely time series ID:
    num_static_categorical_features=1,
    # it has 366 possible values:
    cardinality=[len(train_dataset)],
    # the model will learn an embedding of size 2 for each of the 366 possible values:
    embedding_dimension=[2],
    
    # transformer params:
    encoder_layers=4,
    decoder_layers=4,
    d_model=32,
)

model = TimeSeriesTransformerForPrediction(config)

請注意,與 🤗 Transformers 庫中的其他模型類似,`TimeSeriesTransformerModel` 對應的是沒有頂部任何頭的編碼器-解碼器 Transformer,而 `TimeSeriesTransformerForPrediction` 對應的是在 `TimeSeriesTransformerModel` 之上加了一個**分佈頭**。預設情況下,模型使用學生 t 分佈(但這是可配置的)。

model.config.distribution_output

>>> student_t

這與用於自然語言處理(NLP)的 Transformer 有一個重要區別,後者的頭通常由一個實現為 `nn.Linear` 層的固定類別分佈組成。

定義轉換

接下來,我們定義資料的轉換,特別是用於建立時間特徵(基於資料集或通用特徵)的轉換。

同樣,我們將使用 GluonTS 庫來完成此操作。我們定義一個轉換 `Chain`(這有點類似於影像處理中的 `torchvision.transforms.Compose`)。它允許我們將多個轉換組合成一個單一的流水線。

from gluonts.time_feature import (
    time_features_from_frequency_str,
    TimeFeature,
    get_lags_for_frequency,
)
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

下面的轉換都附有註釋,以解釋它們的作用。總的來說,我們將遍歷資料集中的各個時間序列,並新增/刪除欄位或特徵。

from transformers import PretrainedConfig

def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    # a bit like torchvision.transforms.Compose
    return Chain(
        # step 1: remove static/dynamic fields if not specified
        [RemoveFields(field_names=remove_field_names)]
        # step 2: convert the data to NumPy (potentially not needed)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # we expect an extra dim for the multivariate case:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # step 3: handle the NaN's by filling in the target with zero
            # and return the mask (which is in the observed values)
            # true for observed values, false for nan's
            # the decoder uses this mask (no loss is incurred for unobserved values)
            # see loss_weights inside the xxxForPrediction model
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # step 4: add temporal features based on freq of the dataset
            # month of year in the case when freq="M"
            # these serve as positional encodings
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # step 5: add another temporal feature (just a single number)
            # tells the model where in its life the value of the time series is,
            # sort of a running counter
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # step 6: vertically stack all the temporal features into the key FEAT_TIME
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # step 7: rename to match HuggingFace names
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定義 `InstanceSplitter`

為了進行訓練/驗證/測試,我們接下來建立一個 `InstanceSplitter`,它用於從資料集中取樣視窗(因為,請記住,由於時間和記憶體限制,我們不能將整個歷史值傳遞給 Transformer)。

例項分割器從資料中隨機取樣 `context_length` 大小和隨後的 `prediction_length` 大小的視窗,併為相應視窗中 `time_series_fields` 內的任何時間鍵新增 `past_` 或 `future_` 字首。例項分割器可以配置為三種不同的模式。

  1. mode="train":在這種模式下,我們從給定資料集(訓練資料集)中隨機取樣上下文和預測長度視窗。
  2. mode="validation":在這種模式下,我們從給定資料集(用於回溯測試或驗證似然計算)中取樣最後一個上下文長度視窗和預測視窗。
  3. mode="test":在這種模式下,我們僅取樣最後一個上下文長度視窗(用於預測用例)。
from gluonts.transform.sampler import InstanceSampler
from typing import Optional

def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

建立資料載入器

接下來,是時候建立 DataLoaders 了,它讓我們能夠獲得成批的(輸入,輸出)對——或者換句話說,(`past_values`,`future_values`)。

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # we initialize a Training instance
    instance_splitter = create_instance_splitter(config, "train")

    # the instance splitter will sample a window of
    # context length + lags + prediction length (from the 366 possible transformed time series)
    # randomly from within the target time series and return an iterator.
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(stream)
    
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )
def create_backtest_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data)

    # we create a Validation Instance splitter which will sample the very last
    # context window seen during training only for the encoder.
    instance_sampler = create_instance_splitter(config, "validation")

    # we apply the transformations in train mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=True)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

為了完整性,我們提供了一個測試資料載入器的輔助函式,儘管我們在這裡不會使用它。這在生產環境中很有用,當我們想要從給定時間序列的末尾開始預測時。因此,測試資料載入器將從提供的資料集中取樣最後一個上下文視窗,並將其傳遞給模型。

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # We create a test Instance splitter to sample the very last
    # context window from the dataset provided.
    instance_sampler = create_instance_splitter(config, "test")

    # We apply the transformations in test mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )
train_dataloader = create_train_dataloader(
    config=config,
    freq=freq,
    data=train_dataset,
    batch_size=256,
    num_batches_per_epoch=100,
)

test_dataloader = create_backtest_dataloader(
    config=config,
    freq=freq,
    data=test_dataset,
    batch_size=64,
)

我們來看看第一個批次的資料。

batch = next(iter(train_dataloader))
for k, v in batch.items():
    print(k, v.shape, v.type())

>>> past_time_features torch.Size([256, 85, 2]) torch.FloatTensor
    past_values torch.Size([256, 85]) torch.FloatTensor
    past_observed_mask torch.Size([256, 85]) torch.FloatTensor
    future_time_features torch.Size([256, 24, 2]) torch.FloatTensor
    static_categorical_features torch.Size([256, 1]) torch.LongTensor
    future_values torch.Size([256, 24]) torch.FloatTensor
    future_observed_mask torch.Size([256, 24]) torch.FloatTensor

可以看到,我們沒有像 NLP 模型那樣向編碼器提供 `input_ids` 和 `attention_mask`,而是提供了 `past_values`,以及 `past_observed_mask`、`past_time_features` 和 `static_categorical_features`。

解碼器的輸入包括 `future_values`、`future_observed_mask` 和 `future_time_features`。`future_values` 可以看作是 NLP 中 `decoder_input_ids` 的等價物。

有關每個引數的詳細解釋,請參閱文件

前向傳播

讓我們用剛剛建立的批次進行一次前向傳播。

# perform forward pass
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"]
    if config.num_static_categorical_features > 0
    else None,
    static_real_features=batch["static_real_features"]
    if config.num_static_real_features > 0
    else None,
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True,
)
print("Loss:", outputs.loss.item())

>>> Loss: 9.069628715515137

請注意,模型正在返回一個損失值。這是可能的,因為解碼器會自動將 `future_values` 向右移動一個位置,以獲得標籤。這使得可以計算預測值和標籤之間的損失。

另外,請注意解碼器使用因果掩碼來防止看到未來,因為它需要預測的值在 `future_values` 張量中。

訓練模型

是時候訓練模型了!我們將使用標準的 PyTorch 訓練迴圈。

我們將在這裡使用 🤗 Accelerate 庫,它會自動將模型、最佳化器和資料載入器放置在適當的 `device` 上。

from accelerate import Accelerator
from torch.optim import AdamW

accelerator = Accelerator()
device = accelerator.device

model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)

model, optimizer, train_dataloader = accelerator.prepare(
    model,
    optimizer,
    train_dataloader,
)

model.train()
for epoch in range(40):
    for idx, batch in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device)
            if config.num_static_categorical_features > 0
            else None,
            static_real_features=batch["static_real_features"].to(device)
            if config.num_static_real_features > 0
            else None,
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss

        # Backpropagation
        accelerator.backward(loss)
        optimizer.step()

        if idx % 100 == 0:
            print(loss.item())

推理

在推理時,建議使用 `generate()` 方法進行自迴歸生成,類似於 NLP 模型。

預測過程包括從測試例項取樣器獲取資料,該取樣器將從資料集中每個時間序列的最後一個 `context_length` 大小的視窗中取樣值,並將其傳遞給模型。請注意,我們將 `future_time_features`(這些是預先知道的)傳遞給解碼器。

模型將從預測的分佈中自迴歸地取樣一定數量的值,並將它們傳回解碼器以返回預測輸出。

model.eval()

forecasts = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts.append(outputs.sequences.cpu().numpy())

模型輸出一個形狀為(`batch_size`,`number of samples`,`prediction length`)的張量。

在這種情況下,我們為接下來的 `24` 個月獲得了 `100` 個可能的值(對於批次中大小為 `64` 的每個樣本)。

forecasts[0].shape

>>> (64, 100, 24)

我們將它們垂直堆疊,以獲得測試資料集中所有時間序列的預測。

forecasts = np.vstack(forecasts)
print(forecasts.shape)

>>> (366, 100, 24)

我們可以根據測試集中存在的樣本外真實值來評估最終的預測結果。我們將使用 MASEsMAPE 指標,併為資料集中的每個時間序列計算這些指標。

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1)

mase_metrics = []
smape_metrics = []
for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
        training=np.array(training_data), 
        periodicity=get_seasonality(freq))
    mase_metrics.append(mase["mase"])
    
    smape = smape_metric.compute(
        predictions=forecast_median[item_id], 
        references=np.array(ground_truth), 
    )
    smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.2564196892177717

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.1609541520852549

我們還可以繪製資料集中每個時間序列的單個指標,並觀察到少數幾個時間序列對最終測試指標的貢獻很大。

plt.scatter(mase_metrics, smape_metrics, alpha=0.3)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

png

為了繪製任何時間序列相對於真實測試資料的預測圖,我們定義了以下輔助函式。

import matplotlib.dates as mdates

def plot(ts_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=test_dataset[ts_index][FieldName.START],
        periods=len(test_dataset[ts_index][FieldName.TARGET]),
        freq=freq,
    ).to_timestamp()

    # Major ticks every half year, minor ticks every month,
    ax.xaxis.set_major_locator(mdates.MonthLocator(bymonth=(1, 7)))
    ax.xaxis.set_minor_locator(mdates.MonthLocator())

    ax.plot(
        index[-2*prediction_length:], 
        test_dataset[ts_index]["target"][-2*prediction_length:],
        label="actual",
    )

    plt.plot(
        index[-prediction_length:], 
        np.median(forecasts[ts_index], axis=0),
        label="median",
    )
    
    plt.fill_between(
        index[-prediction_length:],
        forecasts[ts_index].mean(0) - forecasts[ts_index].std(axis=0), 
        forecasts[ts_index].mean(0) + forecasts[ts_index].std(axis=0), 
        alpha=0.3, 
        interpolate=True,
        label="+/- 1-std",
    )
    plt.legend()
    plt.show()

例如:

plot(334)

png

我們如何與其他模型進行比較?Monash 時間序列儲存庫有一個測試集 MASE 指標的比較表,我們可以補充我們的結果。

資料集 SES Theta TBATS ETS (DHR-)ARIMA PR CatBoost FFNN DeepAR N-BEATS WaveNet **Transformer** (我們的)
Tourism Monthly 3.306 1.649 1.751 1.526 1.589 1.678 1.699 1.582 1.409 1.574 1.482 1.256

請注意,使用我們的模型,我們擊敗了所有已報告的其他模型(另見相應論文中的表 2),而且我們沒有進行任何超引數調整。我們只是將 Transformer 訓練了 40 個週期。

當然,我們必須小心,不能僅僅宣稱用神經網路在時間序列上取得了最先進的結果,因為似乎“通常你只需要 XGBoost”。我們只是非常好奇,想看看神經網路能帶我們走多遠,以及 Transformer 在這個領域是否會有用。這個特定的資料集似乎表明,這絕對值得探索。

後續步驟

我們鼓勵讀者使用 Hub 上的其他時間序列資料集來嘗試這個 notebook,並替換適當的頻率和預測長度引數。對於您自己的資料集,您需要將其轉換為 GluonTS 使用的約定,其文件這裡有很好的解釋。我們還準備了一個示例 notebook,向您展示如何將您的資料集轉換為 🤗 datasets 格式,點此檢視

時間序列研究者們會知道,將基於 Transformer 的模型應用於時間序列問題已經引起了廣泛的興趣。原生 Transformer 只是眾多基於注意力的模型之一,因此有必要向庫中新增更多的模型。

目前,沒有什麼能阻止我們對多變數時間序列進行建模,但是為此需要用一個多變數分佈頭來例項化模型。目前支援對角獨立的分佈,其他多變數分佈將會被新增。敬請期待未來包含教程的部落格文章。

路線圖上的另一件事是時間序列分類。這需要在庫中新增一個帶有分類頭的時間序列模型,例如用於異常檢測任務。

當前模型假設時間序列值伴隨著日期時間資訊,但這對於現實世界中的每個時間序列可能並非如此。例如,可以看看像 WOODS 的神經科學資料集。因此,需要泛化當前模型,使整個流水線中的某些輸入成為可選的。

最後,NLP/視覺領域從大型預訓練模型中獲益匪淺,而據我們所知,時間序列領域並非如此。基於 Transformer 的模型似乎是追求這一研究方向的明顯選擇,我們迫不及待地想看看研究人員和實踐者會拿出什麼樣的成果!

社群

註冊登入 發表評論

© . This site is unofficial and not affiliated with Hugging Face, Inc.