使用 Informer 模型進行多元機率時間序列預測

釋出於 2023 年 3 月 10 日
在 GitHub 上更新
Open In Colab

引言

幾個月前,我們介紹了 Time Series Transformer,它是應用於預測任務的原始 Transformer 模型(Vaswani et al., 2017),並展示了一個 單變數 機率預測任務的示例(即單獨預測每個時間序列的一維分佈)。在本文中,我們將介紹 Informer 模型(Zhou, Haoyi, et al., 2021),這篇 AAAI 2021 的最佳論文模型現已 加入 🤗 Transformers。我們將演示如何使用 Informer 模型進行 多元 機率預測任務,即預測未來時間序列目標值 向量 的分佈。請注意,本文介紹的方法也適用於原始的 Time Series Transformer 模型。

多元機率時間序列預測

就機率預測的建模方面而言,Transformer/Informer 在處理多元時間序列時無需改變模型結構。在單變數和多變數設定中,模型都接收一個向量序列,因此唯一的變化在於輸出或發射(emission)端。

對高維資料的完整聯合條件分佈進行建模,計算成本可能會非常高。因此,各種方法會採用一些近似分佈,最簡單的是將資料建模為來自同一分佈族的獨立分佈,或對完整協方差矩陣進行某種低秩近似等。在這裡,我們將採用獨立(或對角)發射,我們所實現的分佈族(見 這裡)均支援這種方法。

Informer 模型深入解析

Informer 基於原始的 Transformer(Vaswani et al., 2017),並引入了兩項主要改進。為了理解這些改進,我們先回顧一下原始 Transformer 的缺點:

  1. 標準自注意力機制的平方級計算複雜度: 原始 Transformer 的計算複雜度為 O(T2D)O(T^2 D),其中 TT 是時間序列的長度,DD 是隱藏狀態的維度。對於長序列時間序列預測(也稱為 LSTF 問題),這種計算成本可能非常高。為了解決這個問題,Informer 採用了一種新的自注意力機制,稱為 ProbSparse 注意力,其時間和空間複雜度為 O(TlogT)O(T \log T)
  2. 堆疊層時的記憶體瓶頸: 當堆疊 NN 個編碼器/解碼器層時,原始 Transformer 的記憶體使用量為 O(NT2)O(N T^2),這限制了模型處理長序列的能力。Informer 使用了一種 蒸餾 (Distilling) 操作,將層與層之間的輸入大小縮減為其一半。透過這種方式,它將總記憶體使用量降低到 O(NTlogT)O(N\cdot T \log T)

正如你所見,Informer 模型的動機與 Longformer (Beltagy et el., 2020)、Sparse Transformer (Child et al., 2019) 以及其他 NLP 論文類似,都是為了在 輸入序列很長時 降低自注意力機制的平方級複雜度。現在,讓我們透過程式碼示例深入瞭解 ProbSparse 注意力和 蒸餾 操作。

ProbSparse 注意力機制

ProbSparse 的核心思想是,標準自注意力的得分呈長尾分佈,其中“活躍”的查詢 (query) 位於得分的“頭部”,而“懶惰”的查詢則位於“尾部”。所謂“活躍”查詢,指的是查詢 qiq_i,其點積 qi,ki\langle q_i,k_i \rangle 貢獻了 主要的注意力權重,而“懶惰”查詢的點積則生成 微不足道 的注意力權重。這裡,qiq_ikik_i 分別是注意力矩陣 QQKK 的第 ii 行。

informer_full_vs_sparse_attention
標準自注意力 vs ProbSparse 注意力,圖片來自 Autoformer (Wu, Haixu, et al., 2021)

基於“活躍”和“懶惰”查詢的理念,ProbSparse 注意力機制會選出“活躍”的查詢,並建立一個簡化的查詢矩陣 QreducedQ_{reduced},用於以 O(TlogT)O(T \log T) 的複雜度計算注意力權重。讓我們透過一個程式碼示例來更詳細地瞭解這一點。

回顧一下標準自注意力的公式:

Attention(Q,K,V)=softmax(QKTdk)V \textrm{Attention}(Q, K, V) = \textrm{softmax}(\frac{QK^T}{\sqrt{d_k}} )V

其中 QRLQ×dQ\in \mathbb{R}^{L_Q \times d}, KRLK×dK\in \mathbb{R}^{L_K \times d}, VRLV×dV\in \mathbb{R}^{L_V \times d}。注意,在實踐中,自注意力計算中的查詢和鍵的輸入長度通常是相等的,即 LQ=LK=TL_Q = L_K = T,其中 TT 是時間序列的長度。因此,QKTQK^T 乘法的計算複雜度為 O(T2d)O(T^2 \cdot d)。在 ProbSparse 注意力中,我們的目標是建立一個新的 QreduceQ_{reduce} 矩陣並定義:

ProbSparseAttention(Q,K,V)=softmax(QreduceKTdk)V \textrm{ProbSparseAttention}(Q, K, V) = \textrm{softmax}(\frac{Q_{reduce}K^T}{\sqrt{d_k}} )V

其中 QreduceQ_{reduce} 矩陣僅選擇前 uu 個“活躍”查詢。這裡,u=clogLQu = c \cdot \log L_Qcc 是 ProbSparse 注意力的超引數,稱為 取樣因子 (sampling factor)。由於 QreduceQ_{reduce} 僅選擇前 uu 個查詢,其大小為 clogLQ×dc\cdot \log L_Q \times d,因此乘法 QreduceKTQ_{reduce}K^T 僅需 O(LKlogLQ)=O(TlogT)O(L_K \log L_Q) = O(T \log T) 的計算量。

這很好!但是我們如何選擇這 uu 個“活躍”查詢來建立 QreduceQ_{reduce} 呢?讓我們來定義 查詢稀疏性度量 (Query Sparsity Measurement)

查詢稀疏性度量

查詢稀疏性度量 M(qi,K)M(q_i, K) 用於從 QQ 中選出 uu 個“活躍”查詢 qiq_i 來建立 QreduceQ_{reduce}。理論上,占主導地位的 qi,ki\langle q_i,k_i \rangle 點積對會促使“活躍”查詢 qiq_i 的機率分佈 偏離 均勻分佈,如下圖所示。因此,實際查詢分佈與均勻分佈之間的 KL 散度 (Kullback–Leibler divergence) 被用來定義稀疏性度量。

informer_probsparse
ProbSparse 注意力機制圖解,來自官方 程式碼庫

在實踐中,該度量定義為:

M(qi,K)=maxjqikjTd1Lkj=1LkqikjTd M(q_i, K) = \max_j \frac{q_ik_j^T}{\sqrt{d}}-\frac{1}{L_k} \sum_{j=1}^{L_k}\frac{q_ik_j^T}{\sqrt{d}}

這裡需要理解的重點是,當 M(qi,K)M(q_i, K) 越大時,查詢 qiq_i 就應該被包含在 QreduceQ_{reduce} 中,反之亦然。

但是我們如何以非平方級的時間複雜度計算 qikjTq_ik_j^T 這一項呢?回想一下,大多數點積 qi,ki\langle q_i,k_i \rangle 無論如何都會生成微不足道的注意力(即長尾分佈特性),因此從 KK 中隨機取樣一個鍵 (key) 的子集就足夠了,這個子集在程式碼中稱為 K_sample

現在,我們可以檢視 `probsparse_attention` 的程式碼了。

from torch import nn
import math


def probsparse_attention(query_states, key_states, value_states, sampling_factor=5):
    """
    Compute the probsparse self-attention.
    Input shape: Batch x Time x Channel

    Note the additional `sampling_factor` input.
    """
    # get input sizes with logs
    L_K = key_states.size(1)
    L_Q = query_states.size(1)
    log_L_K = np.ceil(np.log1p(L_K)).astype("int").item()
    log_L_Q = np.ceil(np.log1p(L_Q)).astype("int").item()

    # calculate a subset of samples to slice from K and create Q_K_sample
    U_part = min(sampling_factor * L_Q * log_L_K, L_K)

    # create Q_K_sample (the q_i * k_j^T term in the sparsity measurement)
    index_sample = torch.randint(0, L_K, (U_part,))
    K_sample = key_states[:, index_sample, :]
    Q_K_sample = torch.bmm(query_states, K_sample.transpose(1, 2))

    # calculate the query sparsity measurement with Q_K_sample
    M = Q_K_sample.max(dim=-1)[0] - torch.div(Q_K_sample.sum(dim=-1), L_K)

    # calculate u to find the Top-u queries under the sparsity measurement
    u = min(sampling_factor * log_L_Q, L_Q)
    M_top = M.topk(u, sorted=False)[1]

    # calculate Q_reduce as query_states[:, M_top]
    dim_for_slice = torch.arange(query_states.size(0)).unsqueeze(-1)
    Q_reduce = query_states[dim_for_slice, M_top]  # size: c*log_L_Q x channel

    # and now, same as the canonical
    d_k = query_states.size(-1)
    attn_scores = torch.bmm(Q_reduce, key_states.transpose(-2, -1))  # Q_reduce x K^T
    attn_scores = attn_scores / math.sqrt(d_k)
    attn_probs = nn.functional.softmax(attn_scores, dim=-1)
    attn_output = torch.bmm(attn_probs, value_states)

    return attn_output, attn_scores

注意,在實現中,為了穩定性,UpartU_{part} 在計算中包含了 LQL_Q(更多資訊請參閱 此討論)。

我們成功了!請注意,這只是 probsparse_attention 的部分實現,完整實現可在 🤗 Transformers 中找到。

蒸餾 (Distilling)

由於 ProbSparse 自注意力機制的存在,編碼器的特徵圖存在一些可以被移除的冗餘。因此,蒸餾操作被用來將編碼器層之間的輸入大小減少一半,從而在理論上去除這種冗餘。在實踐中,Informer 的“蒸餾”操作只是在每個編碼器層之間添加了帶最大池化的一維卷積層。設 XnX_n 為第 nn 個編碼器層的輸出,則蒸餾操作定義為

Xn+1=MaxPool(ELU(Conv1d(Xn)) X_{n+1} = \textrm{MaxPool} ( \textrm{ELU}(\textrm{Conv1d}(X_n))

讓我們在程式碼中看看這個操作。

from torch import nn

# ConvLayer is a class with forward pass applying ELU and MaxPool1d
def informer_encoder_forward(x_input, num_encoder_layers=3, distil=True):
    # Initialize the convolution layers
    if distil:
        conv_layers = nn.ModuleList([ConvLayer() for _ in range(num_encoder_layers - 1)])
        conv_layers.append(None)
    else:
        conv_layers = [None] * num_encoder_layers
    
    # Apply conv_layer between each encoder_layer
    for encoder_layer, conv_layer in zip(encoder_layers, conv_layers):
        output = encoder_layer(x_input)
        if conv_layer is not None:
            output = conv_layer(loutput)
    
    return output

透過將每層的輸入減少一半,我們的記憶體使用量從 O(NT2)O(N\cdot T^2) 降至 O(NTlogT)O(N\cdot T \log T),其中 NN 是編碼器/解碼器層的數量。這正是我們想要的!

Informer 模型現已加入 🤗 Transformers 庫,其名稱就是 InformerModel。在下面的章節中,我們將展示如何在一個自定義的多元時間序列資料集上訓練這個模型。

配置環境

首先,讓我們安裝必要的庫:🤗 Transformers、🤗 Datasets、🤗 Evaluate、🤗 Accelerate 和 GluonTS

正如我們將要展示的,GluonTS 將用於轉換資料以建立特徵,以及建立適當的訓練、驗證和測試批次。

!pip install -q transformers datasets evaluate accelerate gluonts ujson

載入資料集

在這篇博文中,我們將使用 traffic_hourly 資料集,該資料集可在 Hugging Face Hub 上找到。這個資料集包含了 Lai 等人 (2017) 使用的舊金山交通資料集。它包含 862 個小時時間序列,顯示了 2015 年至 2016 年舊金山灣區高速公路的道路佔用率,範圍在 [0,1][0, 1] 之間。

該資料集是 Monash 時間序列預測 儲存庫的一部分,這是一個彙集了多個領域時間序列資料集的集合。它可以被看作是時間序列預測領域的 GLUE 基準

from datasets import load_dataset

dataset = load_dataset("monash_tsf", "traffic_hourly")

可以看到,該資料集包含 3 個劃分:訓練集、驗證集和測試集。

dataset

>>> DatasetDict({
        train: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
        test: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
        validation: Dataset({
            features: ['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'],
            num_rows: 862
        })
    })

每個樣本都包含幾個鍵,其中 starttarget 是最重要的。讓我們看一下資料集中的第一個時間序列。

train_example = dataset["train"][0]
train_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

start 簡單地表示時間序列的開始時間(以 datetime 格式),而 target 包含時間序列的實際值。

start 對於向時間序列值新增時間相關特徵(例如“一年中的月份”)作為模型的額外輸入非常有用。由於我們知道資料的頻率是 hourly(每小時),我們知道例如第二個值的時間戳是 2015-01-01 01:00:012015-01-01 02:00:01,依此類推。

print(train_example["start"])
print(len(train_example["target"]))

>>> 2015-01-01 00:00:01
    17448

驗證集包含與訓練集相同的資料,只是時間上延長了 prediction_length。這使我們能夠根據真實值來驗證模型的預測。

測試集的資料又比驗證集長一個 prediction_length (或者相對於訓練集長了 prediction_length 的某個倍數,用於在多個滾動視窗上進行測試)。

validation_example = dataset["validation"][0]
validation_example.keys()

>>> dict_keys(['start', 'target', 'feat_static_cat', 'feat_dynamic_real', 'item_id'])

初始值與相應的訓練樣本完全相同。但是,這個樣本比訓練樣本多了 prediction_length=48(48 小時或 2 天)個額外的值。讓我們來驗證一下。

freq = "1H"
prediction_length = 48

assert len(train_example["target"]) + prediction_length == len(
    dataset["validation"][0]["target"]
)

讓我們將其視覺化。

import matplotlib.pyplot as plt

num_of_samples = 150

figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
    validation_example["target"][-num_of_samples - prediction_length :],
    color="red",
    alpha=0.5,
)

plt.show()

png

讓我們對資料進行劃分。

train_dataset = dataset["train"]
test_dataset = dataset["test"]

將 `start` 更新為 `pd.Period`

我們要做的第一件事是使用資料的 freq 將每個時間序列的 start 特徵轉換為 pandas 的 Period 索引。

from functools import lru_cache

import pandas as pd
import numpy as np


@lru_cache(10_000)
def convert_to_pandas_period(date, freq):
    return pd.Period(date, freq)


def transform_start_field(batch, freq):
    batch["start"] = [convert_to_pandas_period(date, freq) for date in batch["start"]]
    return batch

我們現在使用 datasetsset_transform 功能來即時地、原地完成這個轉換。

from functools import partial

train_dataset.set_transform(partial(transform_start_field, freq=freq))
test_dataset.set_transform(partial(transform_start_field, freq=freq))

現在,讓我們使用 GluonTS 中的 MultivariateGrouper 將資料集轉換為多元時間序列。這個 grouper 會將獨立的 1 維時間序列轉換成一個 2D 矩陣。

from gluonts.dataset.multivariate_grouper import MultivariateGrouper

num_of_variates = len(train_dataset)

train_grouper = MultivariateGrouper(max_target_dim=num_of_variates)
test_grouper = MultivariateGrouper(
    max_target_dim=num_of_variates,
    num_test_dates=len(test_dataset) // num_of_variates, # number of rolling test windows
)

multi_variate_train_dataset = train_grouper(train_dataset)
multi_variate_test_dataset = test_grouper(test_dataset)

注意,目標現在是 2 維的,其中第一維是變數的數量(時間序列的數量),第二維是時間序列的值(時間維度)。

multi_variate_train_example = multi_variate_train_dataset[0]
print("multi_variate_train_example["target"].shape =", multi_variate_train_example["target"].shape)

>>> multi_variate_train_example["target"].shape = (862, 17448)

定義模型

接下來,讓我們例項化一個模型。該模型將從頭開始訓練,因此我們不會使用 from_pretrained 方法,而是根據 config 來隨機初始化模型。

我們為模型指定了幾個額外的引數。

  • prediction_length(在我們的例子中是 48 小時):這是 Informer 解碼器將學習預測的時間範圍;
  • context_length:如果未指定 context_length,模型會將 context_length(編碼器的輸入)設定為與 prediction_length 相等;
  • 給定頻率的 lags:這些引數指定了一種高效的“回看”機制,我們將過去的值與當前值連線起來作為額外的特徵。例如,對於 Daily 頻率,我們可能會考慮 [1, 7, 30, ...] 的回看,而對於 Minute 資料,我們可能會考慮 [1, 30, 60, 60*24, ...] 等;
  • 時間特徵的數量:在我們的例子中,這個值將是 5,因為我們將新增 HourOfDayDayOfWeek、... 和 Age 特徵(詳見下文)。

讓我們檢查一下 GluonTS 為給定頻率(“hourly”)提供的預設 lags。

from gluonts.time_feature import get_lags_for_frequency

lags_sequence = get_lags_for_frequency(freq)
print(lags_sequence)

>>> [1, 2, 3, 4, 5, 6, 7, 23, 24, 25, 47, 48, 49, 71, 72, 73, 95, 96, 97, 119, 120, 
     121, 143, 144, 145, 167, 168, 169, 335, 336, 337, 503, 504, 505, 671, 672, 673, 719, 720, 721]

這意味著對於每個時間步,它會回看最多 721 小時(約 30 天)作為額外特徵。然而,由此產生的特徵向量大小將是 len(lags_sequence)*num_of_variates,在我們的情況下將是 34480!這是行不通的,所以我們將使用我們自己設定的合理 lags。

我們再檢查一下 GluonTS 提供的預設時間特徵。

from gluonts.time_feature import time_features_from_frequency_str

time_features = time_features_from_frequency_str(freq)
print(time_features)

>>> [<function hour_of_day at 0x7f3809539240>, <function day_of_week at 0x7f3809539360>, <function day_of_month at 0x7f3809539480>, <function day_of_year at 0x7f38095395a0>]

在這種情況下,有四個額外的特徵,即“小時”、“星期幾”、“月中的天”和“年中的天”。這意味著對於每個時間步,我們將把這些特徵作為標量值新增進去。例如,考慮時間戳 2015-01-01 01:00:01,這四個額外的特徵將會是

from pandas.core.arrays.period import period_array

timestamp = pd.Period("2015-01-01 01:00:01", freq=freq)
timestamp_as_index = pd.PeriodIndex(data=period_array([timestamp]))
additional_features = [
    (time_feature.__name__, time_feature(timestamp_as_index))
    for time_feature in time_features
]
print(dict(additional_features))

>>> {'hour_of_day': array([-0.45652174]), 'day_of_week': array([0.]), 'day_of_month': array([-0.5]), 'day_of_year': array([-0.5])}

請注意,GluonTS 將小時和天編碼為 [-0.5, 0.5] 之間的值。有關 time_features 的更多資訊,請參閱此連結。除了這 4 個特徵外,我們還將在資料轉換中新增一個“age”特徵,具體如下。

我們現在已經具備了定義模型所需的一切。

from transformers import InformerConfig, InformerForPrediction

config = InformerConfig(
    # in the multivariate setting, input_size is the number of variates in the time series per time step
    input_size=num_of_variates,
    # prediction length:
    prediction_length=prediction_length,
    # context length:
    context_length=prediction_length * 2,
    # lags value copied from 1 week before:
    lags_sequence=[1, 24 * 7],
    # we'll add 5 time features ("hour_of_day", ..., and "age"):
    num_time_features=len(time_features) + 1,
    
    # informer params:
    dropout=0.1,
    encoder_layers=6,
    decoder_layers=4,
    # project input from num_of_variates*len(lags_sequence)+num_time_features to:
    d_model=64,
)

model = InformerForPrediction(config)

預設情況下,模型使用對角學生 t 分佈(但這是可配置的)。

model.config.distribution_output

>>> 'student_t'

定義轉換

接下來,我們定義資料的轉換,特別是用於建立時間特徵(基於資料集或通用特徵)的轉換。

我們再次使用 GluonTS 庫來完成此操作。我們定義了一個轉換鏈(Chain),這有點類似於影像處理中的 torchvision.transforms.Compose。它允許我們將多個轉換組合成一個單一的流水線。

from gluonts.time_feature import TimeFeature
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
    AddAgeFeature,
    AddObservedValuesIndicator,
    AddTimeFeatures,
    AsNumpyArray,
    Chain,
    ExpectedNumInstanceSampler,
    InstanceSplitter,
    RemoveFields,
    SelectFields,
    SetField,
    TestSplitSampler,
    Transformation,
    ValidationSplitSampler,
    VstackFeatures,
    RenameFields,
)

下面的轉換都附有註釋,以解釋它們的作用。總的來說,我們將遍歷資料集中的各個時間序列,並新增/刪除欄位或特徵。

from transformers import PretrainedConfig


def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
    # create list of fields to remove later
    remove_field_names = []
    if config.num_static_real_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_REAL)
    if config.num_dynamic_real_features == 0:
        remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
    if config.num_static_categorical_features == 0:
        remove_field_names.append(FieldName.FEAT_STATIC_CAT)

    return Chain(
        # step 1: remove static/dynamic fields if not specified
        [RemoveFields(field_names=remove_field_names)]
        # step 2: convert the data to NumPy (potentially not needed)
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_CAT,
                    expected_ndim=1,
                    dtype=int,
                )
            ]
            if config.num_static_categorical_features > 0
            else []
        )
        + (
            [
                AsNumpyArray(
                    field=FieldName.FEAT_STATIC_REAL,
                    expected_ndim=1,
                )
            ]
            if config.num_static_real_features > 0
            else []
        )
        + [
            AsNumpyArray(
                field=FieldName.TARGET,
                # we expect an extra dim for the multivariate case:
                expected_ndim=1 if config.input_size == 1 else 2,
            ),
            # step 3: handle the NaN's by filling in the target with zero
            # and return the mask (which is in the observed values)
            # true for observed values, false for nan's
            # the decoder uses this mask (no loss is incurred for unobserved values)
            # see loss_weights inside the xxxForPrediction model
            AddObservedValuesIndicator(
                target_field=FieldName.TARGET,
                output_field=FieldName.OBSERVED_VALUES,
            ),
            # step 4: add temporal features based on freq of the dataset
            # these serve as positional encodings
            AddTimeFeatures(
                start_field=FieldName.START,
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_TIME,
                time_features=time_features_from_frequency_str(freq),
                pred_length=config.prediction_length,
            ),
            # step 5: add another temporal feature (just a single number)
            # tells the model where in the life the value of the time series is
            # sort of running counter
            AddAgeFeature(
                target_field=FieldName.TARGET,
                output_field=FieldName.FEAT_AGE,
                pred_length=config.prediction_length,
                log_scale=True,
            ),
            # step 6: vertically stack all the temporal features into the key FEAT_TIME
            VstackFeatures(
                output_field=FieldName.FEAT_TIME,
                input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
                + (
                    [FieldName.FEAT_DYNAMIC_REAL]
                    if config.num_dynamic_real_features > 0
                    else []
                ),
            ),
            # step 7: rename to match HuggingFace names
            RenameFields(
                mapping={
                    FieldName.FEAT_STATIC_CAT: "static_categorical_features",
                    FieldName.FEAT_STATIC_REAL: "static_real_features",
                    FieldName.FEAT_TIME: "time_features",
                    FieldName.TARGET: "values",
                    FieldName.OBSERVED_VALUES: "observed_mask",
                }
            ),
        ]
    )

定義 `InstanceSplitter`

接下來,為了進行訓練/驗證/測試,我們建立一個 InstanceSplitter,它用於從資料集中取樣視窗(記住,由於時間和記憶體的限制,我們不能將整個歷史值傳遞給模型)。

instance splitter 從資料中隨機取樣大小為 context_length 的視窗以及緊隨其後的 prediction_length 大小的視窗,併為相應視窗中 time_series_fields 裡的任何時間鍵新增 past_future_ 字首。instance splitter 可以配置為三種不同的模式。

  1. mode="train":在這種模式下,我們從給定資料集(訓練資料集)中隨機取樣上下文和預測長度視窗。
  2. mode="validation":在這種模式下,我們從給定資料集(用於回溯測試或驗證似然計算)中取樣最後一個上下文長度視窗和預測視窗。
  3. mode="test":在這種模式下,我們僅取樣最後一個上下文長度視窗(用於預測用例)。
from gluonts.transform.sampler import InstanceSampler
from typing import Optional


def create_instance_splitter(
    config: PretrainedConfig,
    mode: str,
    train_sampler: Optional[InstanceSampler] = None,
    validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
    assert mode in ["train", "validation", "test"]

    instance_sampler = {
        "train": train_sampler
        or ExpectedNumInstanceSampler(
            num_instances=1.0, min_future=config.prediction_length
        ),
        "validation": validation_sampler
        or ValidationSplitSampler(min_future=config.prediction_length),
        "test": TestSplitSampler(),
    }[mode]

    return InstanceSplitter(
        target_field="values",
        is_pad_field=FieldName.IS_PAD,
        start_field=FieldName.START,
        forecast_start_field=FieldName.FORECAST_START,
        instance_sampler=instance_sampler,
        past_length=config.context_length + max(config.lags_sequence),
        future_length=config.prediction_length,
        time_series_fields=["time_features", "observed_mask"],
    )

建立 DataLoaders

接下來,是時候建立 DataLoaders了,它讓我們能夠獲取成批的 (輸入, 輸出) 對——換句話說就是 (past_values, future_values)。

from typing import Iterable

import torch
from gluonts.itertools import Cached, Cyclic
from gluonts.dataset.loader import as_stacked_batches


def create_train_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    num_batches_per_epoch: int,
    shuffle_buffer_length: Optional[int] = None,
    cache_data: bool = True,
    **kwargs,
) -> Iterable:
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
        "future_values",
        "future_observed_mask",
    ]

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=True)
    if cache_data:
        transformed_data = Cached(transformed_data)

    # we initialize a Training instance
    instance_splitter = create_instance_splitter(config, "train")

    # the instance splitter will sample a window of
    # context length + lags + prediction length (from all the possible transformed time series, 1 in our case)
    # randomly from within the target time series and return an iterator.
    stream = Cyclic(transformed_data).stream()
    training_instances = instance_splitter.apply(stream)
    
    return as_stacked_batches(
        training_instances,
        batch_size=batch_size,
        shuffle_buffer_length=shuffle_buffer_length,
        field_names=TRAINING_INPUT_NAMES,
        output_type=torch.tensor,
        num_batches_per_epoch=num_batches_per_epoch,
    )
def create_backtest_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data)

    # we create a Validation Instance splitter which will sample the very last
    # context window seen during training only for the encoder.
    instance_sampler = create_instance_splitter(config, "validation")

    # we apply the transformations in train mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=True)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )

def create_test_dataloader(
    config: PretrainedConfig,
    freq,
    data,
    batch_size: int,
    **kwargs,
):
    PREDICTION_INPUT_NAMES = [
        "past_time_features",
        "past_values",
        "past_observed_mask",
        "future_time_features",
    ]
    if config.num_static_categorical_features > 0:
        PREDICTION_INPUT_NAMES.append("static_categorical_features")

    if config.num_static_real_features > 0:
        PREDICTION_INPUT_NAMES.append("static_real_features")

    transformation = create_transformation(freq, config)
    transformed_data = transformation.apply(data, is_train=False)

    # We create a test Instance splitter to sample the very last
    # context window from the dataset provided.
    instance_sampler = create_instance_splitter(config, "test")

    # We apply the transformations in test mode
    testing_instances = instance_sampler.apply(transformed_data, is_train=False)
    
    return as_stacked_batches(
        testing_instances,
        batch_size=batch_size,
        output_type=torch.tensor,
        field_names=PREDICTION_INPUT_NAMES,
    )
train_dataloader = create_train_dataloader(
    config=config,
    freq=freq,
    data=multi_variate_train_dataset,
    batch_size=256,
    num_batches_per_epoch=100,
    num_workers=2,
)

test_dataloader = create_backtest_dataloader(
    config=config,
    freq=freq,
    data=multi_variate_test_dataset,
    batch_size=32,
)

讓我們檢查第一個批次。

batch = next(iter(train_dataloader))
for k, v in batch.items():
    print(k, v.shape, v.type())

>>> past_time_features torch.Size([256, 264, 5]) torch.FloatTensor
    past_values torch.Size([256, 264, 862]) torch.FloatTensor
    past_observed_mask torch.Size([256, 264, 862]) torch.FloatTensor
    future_time_features torch.Size([256, 48, 5]) torch.FloatTensor
    future_values torch.Size([256, 48, 862]) torch.FloatTensor
    future_observed_mask torch.Size([256, 48, 862]) torch.FloatTensor

可以看到,我們沒有像 NLP 模型那樣向編碼器提供 input_idsattention_mask,而是提供了 past_values,以及 past_observed_maskpast_time_featuresstatic_real_features

解碼器的輸入包括 future_valuesfuture_observed_maskfuture_time_featuresfuture_values 可以看作是 NLP 中 decoder_input_ids 的等價物。

關於每個引數的詳細解釋,請參閱文件

前向傳播

讓我們用剛剛建立的批次進行一次前向傳播。

# perform forward pass
outputs = model(
    past_values=batch["past_values"],
    past_time_features=batch["past_time_features"],
    past_observed_mask=batch["past_observed_mask"],
    static_categorical_features=batch["static_categorical_features"]
    if config.num_static_categorical_features > 0
    else None,
    static_real_features=batch["static_real_features"]
    if config.num_static_real_features > 0
    else None,
    future_values=batch["future_values"],
    future_time_features=batch["future_time_features"],
    future_observed_mask=batch["future_observed_mask"],
    output_hidden_states=True,
)
print("Loss:", outputs.loss.item())

>>> Loss: -1071.5718994140625

請注意,模型返回了一個損失值。這是因為解碼器自動將 future_values 向右移動一個位置以獲得標籤。這使得我們可以在預測值和標籤之間計算損失。損失是預測分佈相對於真實值的負對數似然,它會趨向於負無窮大。

另請注意,解碼器使用因果掩碼來避免看到未來,因為它需要預測的值位於 future_values 張量中。

訓練模型

是時候訓練模型了!我們將使用一個標準的 PyTorch 訓練迴圈。

我們將在這裡使用 🤗 Accelerate 庫,它會自動將模型、最佳化器和資料載入器放置在適當的 device 上。

from accelerate import Accelerator
from torch.optim import AdamW

epochs = 25
loss_history = []

accelerator = Accelerator()
device = accelerator.device

model.to(device)
optimizer = AdamW(model.parameters(), lr=6e-4, betas=(0.9, 0.95), weight_decay=1e-1)

model, optimizer, train_dataloader = accelerator.prepare(
    model,
    optimizer,
    train_dataloader,
)

model.train()
for epoch in range(epochs):
    for idx, batch in enumerate(train_dataloader):
        optimizer.zero_grad()
        outputs = model(
            static_categorical_features=batch["static_categorical_features"].to(device)
            if config.num_static_categorical_features > 0
            else None,
            static_real_features=batch["static_real_features"].to(device)
            if config.num_static_real_features > 0
            else None,
            past_time_features=batch["past_time_features"].to(device),
            past_values=batch["past_values"].to(device),
            future_time_features=batch["future_time_features"].to(device),
            future_values=batch["future_values"].to(device),
            past_observed_mask=batch["past_observed_mask"].to(device),
            future_observed_mask=batch["future_observed_mask"].to(device),
        )
        loss = outputs.loss

        # Backpropagation
        accelerator.backward(loss)
        optimizer.step()

        loss_history.append(loss.item())
        if idx % 100 == 0:
            print(loss.item())

>>> -1081.978515625
    ...
    -2877.723876953125
# view training
loss_history = np.array(loss_history).reshape(-1)
x = range(loss_history.shape[0])
plt.figure(figsize=(10, 5))
plt.plot(x, loss_history, label="train")
plt.title("Loss", fontsize=15)
plt.legend(loc="upper right")
plt.xlabel("iteration")
plt.ylabel("nll")
plt.show()

png

推理

在推理時,建議使用 generate() 方法進行自迴歸生成,這與 NLP 模型類似。

預測過程涉及從測試例項取樣器中獲取資料,該取樣器將從資料集中每個時間序列的最後一個 context_length 大小的視窗中取樣值,並將其傳遞給模型。請注意,我們將 future_time_features(這些是預先知道的)傳遞給解碼器。

模型將從預測的分佈中自迴歸地取樣一定數量的值,並將它們傳回解碼器以返回預測輸出。

model.eval()

forecasts_ = []

for batch in test_dataloader:
    outputs = model.generate(
        static_categorical_features=batch["static_categorical_features"].to(device)
        if config.num_static_categorical_features > 0
        else None,
        static_real_features=batch["static_real_features"].to(device)
        if config.num_static_real_features > 0
        else None,
        past_time_features=batch["past_time_features"].to(device),
        past_values=batch["past_values"].to(device),
        future_time_features=batch["future_time_features"].to(device),
        past_observed_mask=batch["past_observed_mask"].to(device),
    )
    forecasts_.append(outputs.sequences.cpu().numpy())

模型輸出一個形狀為 (batch_size, number of samples, prediction length, input_size) 的張量。

在這種情況下,我們為 862 個時間序列中的每一個,都得到了未來 48 小時的 100 個可能值(對於批次中的每個樣本,批次大小為 1,因為我們只有一個多元時間序列)。

forecasts_[0].shape

>>> (1, 100, 48, 862)

我們將它們垂直堆疊,以獲得測試資料集中所有時間序列的預測(以防測試集中有更多的時間序列)。

forecasts = np.vstack(forecasts_)
print(forecasts.shape)

>>> (1, 100, 48, 862)

我們可以將得到的預測與測試集中存在的樣本外真實值進行評估。為此,我們將使用 🤗 Evaluate 庫,其中包括 MASEsMAPE 指標。

我們為資料集中的每個時間序列變數計算這兩個指標。

from evaluate import load
from gluonts.time_feature import get_seasonality

mase_metric = load("evaluate-metric/mase")
smape_metric = load("evaluate-metric/smape")

forecast_median = np.median(forecasts, 1).squeeze(0).T

mase_metrics = []
smape_metrics = []

for item_id, ts in enumerate(test_dataset):
    training_data = ts["target"][:-prediction_length]
    ground_truth = ts["target"][-prediction_length:]
    mase = mase_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
        training=np.array(training_data),
        periodicity=get_seasonality(freq),
    )
    mase_metrics.append(mase["mase"])

    smape = smape_metric.compute(
        predictions=forecast_median[item_id],
        references=np.array(ground_truth),
    )
    smape_metrics.append(smape["smape"])
print(f"MASE: {np.mean(mase_metrics)}")

>>> MASE: 1.1913437728068093

print(f"sMAPE: {np.mean(smape_metrics)}")

>>> sMAPE: 0.5322665081607634
plt.scatter(mase_metrics, smape_metrics, alpha=0.2)
plt.xlabel("MASE")
plt.ylabel("sMAPE")
plt.show()

png

為了繪製任何時間序列變數相對於真實測試資料的預測圖,我們定義了以下輔助函式。

import matplotlib.dates as mdates


def plot(ts_index, mv_index):
    fig, ax = plt.subplots()

    index = pd.period_range(
        start=multi_variate_test_dataset[ts_index][FieldName.START],
        periods=len(multi_variate_test_dataset[ts_index][FieldName.TARGET]),
        freq=multi_variate_test_dataset[ts_index][FieldName.START].freq,
    ).to_timestamp()

    ax.xaxis.set_minor_locator(mdates.HourLocator())

    ax.plot(
        index[-2 * prediction_length :],
        multi_variate_test_dataset[ts_index]["target"][mv_index, -2 * prediction_length :],
        label="actual",
    )

    ax.plot(
        index[-prediction_length:],
        forecasts[ts_index, ..., mv_index].mean(axis=0),
        label="mean",
    )
    ax.fill_between(
        index[-prediction_length:],
        forecasts[ts_index, ..., mv_index].mean(0)
        - forecasts[ts_index, ..., mv_index].std(axis=0),
        forecasts[ts_index, ..., mv_index].mean(0)
        + forecasts[ts_index, ..., mv_index].std(axis=0),
        alpha=0.2,
        interpolate=True,
        label="+/- 1-std",
    )
    ax.legend()
    fig.autofmt_xdate()

例如:

plot(0, 344)

png

結論

我們如何與其他模型進行比較?Monash 時間序列儲存庫有一個測試集 MASE 指標的比較表,我們可以將我們的結果新增到其中。

資料集 SES Theta TBATS ETS (DHR-)ARIMA PR CatBoost FFNN DeepAR N-BEATS WaveNet Transformer (uni.) Informer (mv. our)
Traffic Hourly 1.922 1.922 2.482 2.294 2.535 1.281 1.571 0.892 0.825 1.100 1.066 0.821 1.191

可以看出,也許令一些人驚訝的是,多元預測通常比單變數預測要*差*,原因在於估計跨序列相關性/關係的難度。估計值增加的額外方差常常損害最終的預測,或者模型學習到虛假的關聯。我們推薦閱讀這篇論文以獲取更多資訊。多元模型在大量資料上訓練時往往表現良好。

所以,原始的 Transformer 在這裡仍然表現最好!未來,我們希望在一個集中的地方更好地對這些模型進行基準測試,以便更容易地復現多篇論文的結果。敬請期待更多內容!

資源

我們建議檢視 Informer 文件以及本博文頂部連結的示例 notebook

社群

你好,

感謝這篇非常有用的部落格。
當我嘗試從 Monash-University/monash_tsf 載入任何小時資料集時,出現 "DatasetGenerationError: An error occurred while generating the dataset" 這個錯誤。
我能夠載入除小時資料集之外的其他資料集。
您知道為什麼嗎?
如何解決這個問題?
我想嘗試多元機率時間序列預測。我可以使用哪些資料集?

謝謝,
Nouda

·
文章作者

導致錯誤的確切命令是什麼?

註冊登入 以發表評論

© . This site is unofficial and not affiliated with Hugging Face, Inc.