是的,Transformer 模型對於時間序列預測是有效的 (+ Autoformer)
引言
幾個月前,我們介紹了 Informer 模型(Zhou, Haoyi, et al., 2021),它是一個時間序列 Transformer 模型,並獲得了 AAAI 2021 最佳論文獎。我們還提供了使用 Informer 進行多元機率預測的示例。在這篇文章中,我們將討論一個問題:Transformer 模型對時間序列預測是否有效?(AAAI 2023)。正如我們將看到的,它們是有效的。
首先,我們將提供經驗證據表明 **Transformer 模型確實對時間序列預測有效**。我們的比較表明,簡單的線性模型 DLinear 並不像所聲稱的那樣優於 Transformer 模型。當與相同設定下相同大小的線性模型進行比較時,基於 Transformer 的模型在我們考慮的測試集指標上表現更好。之後,我們將介紹 Autoformer 模型(Wu, Haixu, et al., 2021),該模型於 Informer 模型之後,在 NeurIPS 2021 上發表。Autoformer 模型現已在 🤗 Transformers 中可用。最後,我們將討論 DLinear 模型,它是一個簡單的前饋網路,使用了 Autoformer 的分解層。DLinear 模型首次在《Transformer 模型對時間序列預測是否有效?》中介紹,並聲稱在時間序列預測中優於基於 Transformer 的模型。
衝呀!
基準測試 - Transformer 模型 vs. DLinear
在最近發表在 AAAI 2023 的論文 《Transformer 模型對時間序列預測是否有效?》中,作者聲稱 Transformer 模型對時間序列預測無效。他們將基於 Transformer 的模型與他們稱為 DLinear 的簡單線性模型進行比較。DLinear 模型使用了 Autoformer 模型中的分解層,我們將在本文後面介紹。作者聲稱 DLinear 模型在時間序列預測中優於基於 Transformer 的模型。是這樣嗎?讓我們來一探究竟。
資料集 | Autoformer (univariate) MASE | DLinear MASE |
---|---|---|
交通 |
0.910 | 0.965 |
匯率 |
1.087 | 1.690 |
電力 |
0.751 | 0.831 |
上表顯示了 Autoformer 和 DLinear 模型在論文中使用的三個資料集上的比較結果。
結果表明,Autoformer 模型在所有三個資料集上都優於 DLinear 模型。
接下來,我們將介紹新的 Autoformer 模型以及 DLinear 模型。我們將展示如何將它們與上表中的交通資料集進行比較,並解釋我們獲得的結果。
總結: 儘管簡單的線性模型在某些情況下具有優勢,但與 Transformer 等更復雜的模型相比,在單變數設定下,它無法融合協變數。
Autoformer - 內部解析
Autoformer 建立在將時間序列分解為季節性和趨勢-週期分量的傳統方法之上。這透過引入一個*分解層*來實現,該層增強了模型準確捕獲這些分量的能力。此外,Autoformer 引入了一種創新的自相關機制,取代了香草 Transformer 中使用的標準自注意力。這種機制使模型能夠利用基於週期的依賴性進行注意力,從而提高整體效能。
在接下來的部分中,我們將深入探討 Autoformer 的兩個關鍵貢獻:*分解層*和*注意力(自相關)機制*。我們還將提供程式碼示例,以說明這些元件在 Autoformer 架構中的功能。
分解層
分解長期以來一直是時間序列分析中流行的方法,但在 Autoformer 論文問世之前,它尚未廣泛地融入深度學習模型。在對概念進行簡要解釋之後,我們將使用 PyTorch 程式碼演示如何在 Autoformer 中應用這一思想。
時間序列分解
在時間序列分析中,分解是一種將時間序列分解為三個系統性成分的方法:趨勢-週期、季節性變化和隨機波動。趨勢成分表示時間序列的長期方向,它可以隨時間增加、減少或保持穩定。季節性成分表示時間序列中出現的週期性模式,例如年度或季度週期。最後,隨機(有時稱為“不規則”)成分表示資料中無法透過趨勢或季節性成分解釋的隨機噪聲。
分解主要有兩種型別:加法分解和乘法分解,它們在優秀的 statsmodels 庫中實現。透過將時間序列分解為這些成分,我們可以更好地理解和建模資料中的潛在模式。
但是,我們如何將分解納入 Transformer 架構呢?讓我們看看 Autoformer 是如何做到的。
Autoformer 中的分解
![]() |
---|
來自論文的 Autoformer 架構 |
Autoformer 將分解塊作為模型的內部操作,如 Autoformer 架構圖中所示。可以看出,編碼器和解碼器都使用分解塊來聚合趨勢-週期部分並從序列中逐步提取季節性部分。自 Autoformer 發表以來,內部分解的概念已證明其有用性。隨後,它已被其他幾篇時間序列論文采用,例如 FEDformer(Zhou, Tian, et al., ICML 2022)和 DLinear (Zeng, Ailing, et al., AAAI 2023),這凸顯了它在時間序列建模中的重要性。
現在,我們來正式定義分解層
對於長度為 的輸入序列 ,定義為 ,分解層返回
以及 PyTorch 中的實現
import torch
from torch import nn
class DecompositionLayer(nn.Module):
"""
Returns the trend and the seasonal parts of the time series.
"""
def __init__(self, kernel_size):
super().__init__()
self.kernel_size = kernel_size
self.avg = nn.AvgPool1d(kernel_size=kernel_size, stride=1, padding=0) # moving average
def forward(self, x):
"""Input shape: Batch x Time x EMBED_DIM"""
# padding on the both ends of time series
num_of_pads = (self.kernel_size - 1) // 2
front = x[:, 0:1, :].repeat(1, num_of_pads, 1)
end = x[:, -1:, :].repeat(1, num_of_pads, 1)
x_padded = torch.cat([front, x, end], dim=1)
# calculate the trend and seasonal part of the series
x_trend = self.avg(x_padded.permute(0, 2, 1)).permute(0, 2, 1)
x_seasonal = x - x_trend
return x_seasonal, x_trend
如您所見,這個實現非常簡單,並且可以用於其他模型,正如我們將在 DLinear 中看到的那樣。現在,讓我們解釋第二個貢獻——*注意力(自相關)機制*。
注意力(自相關)機制
![]() |
---|
香草自注意力機制 vs 自相關機制,來自論文 |
除了分解層,Autoformer 還採用了一種新穎的自相關機制,它無縫地取代了自注意力。在香草時間序列 Transformer中,注意力權重在時域中計算並逐點聚合。另一方面,如上圖所示,Autoformer 在頻域中(使用快速傅立葉變換)計算它們,並透過時間延遲進行聚合。
在接下來的部分中,我們將詳細深入探討這些主題,並提供程式碼示例進行解釋。
頻域注意力
![]() |
---|
利用 FFT 在頻域中計算注意力權重,來自論文 |
理論上,給定時間滯後 ,單個離散變數 的*自相關*用於衡量變數當前值在時間 與其過去值在時間 之間的“關係”(皮爾遜相關性)
Autoformer 利用自相關性從查詢和鍵中提取基於頻率的依賴關係,而不是它們之間的標準點積。您可以將其視為自注意力中 項的替代。
實際上,查詢和鍵的自相關性會透過 FFT 一次性計算出**所有滯後**的自相關性。透過這樣做,自相關機制實現了 的時間複雜度(其中 是輸入時間長度),類似於 Informer 的 ProbSparse 注意力。請注意,使用 FFT 計算自相關性的理論基礎是維納-辛欽定理,這超出了本部落格文章的範圍。
現在,我們準備好檢視 PyTorch 程式碼
import torch
def autocorrelation(query_states, key_states):
"""
Computes autocorrelation(Q,K) using `torch.fft`.
Think about it as a replacement for the QK^T in the self-attention.
Assumption: states are resized to same shape of [batch_size, time_length, embedding_dim].
"""
query_states_fft = torch.fft.rfft(query_states, dim=1)
key_states_fft = torch.fft.rfft(key_states, dim=1)
attn_weights = query_states_fft * torch.conj(key_states_fft)
attn_weights = torch.fft.irfft(attn_weights, dim=1)
return attn_weights
很簡單!😎 請注意,這只是 autocorrelation(Q,K)
的部分實現,完整實現可以在 🤗 Transformers 中找到。
接下來,我們將瞭解如何透過時間延遲將 attn_weights
與值進行聚合,這一過程稱為*時間延遲聚合*。
時間延遲聚合
![]() |
---|
按時間延遲聚合,來自Autoformer 論文 |
我們把自相關(稱為 attn_weights
)看作 。問題來了:我們如何將這些 與 進行聚合?在標準的自注意力機制中,這種聚合是透過點積完成的。然而,在 Autoformer 中,我們採用了一種不同的方法。首先,我們透過計算 在每個時間延遲 的值來對其進行對齊,這也被稱為*滾動*。隨後,我們對對齊後的 和自相關進行逐元素相乘。在所提供的圖中,您可以看到左側展示了按時間延遲滾動的 ,而右側則展示了與自相關的逐元素相乘。
可以用以下公式總結
就是這樣!請注意, 由超引數 autocorrelation_factor
控制(類似於 Informer 中的 sampling_factor
),並在乘法之前將 softmax 應用於自相關。
現在,我們準備好檢視最終程式碼
import torch
import math
def time_delay_aggregation(attn_weights, value_states, autocorrelation_factor=2):
"""
Computes aggregation as value_states.roll(delay) * top_k_autocorrelations(delay).
The final result is the autocorrelation-attention output.
Think about it as a replacement of the dot-product between attn_weights and value states.
The autocorrelation_factor is used to find top k autocorrelations delays.
Assumption: value_states and attn_weights shape: [batch_size, time_length, embedding_dim]
"""
bsz, num_heads, tgt_len, channel = ...
time_length = value_states.size(1)
autocorrelations = attn_weights.view(bsz, num_heads, tgt_len, channel)
# find top k autocorrelations delays
top_k = int(autocorrelation_factor * math.log(time_length))
autocorrelations_mean = torch.mean(autocorrelations, dim=(1, -1)) # bsz x tgt_len
top_k_autocorrelations, top_k_delays = torch.topk(autocorrelations_mean, top_k, dim=1)
# apply softmax on the channel dim
top_k_autocorrelations = torch.softmax(top_k_autocorrelations, dim=-1) # bsz x top_k
# compute aggregation: value_states.roll(delay) * top_k_autocorrelations(delay)
delays_agg = torch.zeros_like(value_states).float() # bsz x time_length x channel
for i in range(top_k):
value_states_roll_delay = value_states.roll(shifts=-int(top_k_delays[i]), dims=1)
top_k_at_delay = top_k_autocorrelations[:, i]
# aggregation
top_k_resized = top_k_at_delay.view(-1, 1, 1).repeat(num_heads, tgt_len, channel)
delays_agg += value_states_roll_delay * top_k_resized
attn_output = delays_agg.contiguous()
return attn_output
我們成功了!Autoformer 模型現已在 🤗 Transformers 庫中可用,並簡稱為 AutoformerModel
。
我們使用此模型的策略是,展示單變數 Transformer 模型與 DLinear 模型(本質上是單變數模型,將在接下來展示)的效能比較。我們還將展示在相同資料上訓練的**兩個**多變數 Transformer 模型的評估結果。
DLinear - 內部解析
實際上,DLinear 的概念很簡單:它只是一個帶有 Autoformer 的 DecompositionLayer
的全連線網路。它使用上述 DecompositionLayer
將輸入時間序列分解為殘差(季節性)和趨勢部分。在前向傳播中,每個部分都會透過其自己的線性層,將訊號投影到適當的 prediction_length
大小輸出。最終輸出是點預測模型中兩個相應輸出的總和
def forward(self, context):
seasonal, trend = self.decomposition(context)
seasonal_output = self.linear_seasonal(seasonal)
trend_output = self.linear_trend(trend)
return seasonal_output + trend_output
在機率設定中,可以將上下文長度陣列透過 linear_seasonal
和 linear_trend
層投影到 prediction-length * hidden
維度。然後將得到的輸出相加並重塑為 (prediction_length, hidden)
。最後,一個機率頭部將大小為 hidden
的潛在表示對映到某個分佈的引數。
在我們的基準測試中,我們使用來自 GluonTS 的 DLinear 實現。
示例:交通資料集
我們希望透過在 traffic
資料集上進行基準測試來實證展示庫中基於 Transformer 的模型的效能,該資料集包含 862 個時間序列。我們將在每個獨立時間序列(即單變數設定)上訓練一個共享模型。每個時間序列表示一個感測器的佔用值,範圍在 [0, 1] 之間。我們將為所有模型固定以下超引數:
# Traffic prediction_length is 24. Reference:
# https://github.com/awslabs/gluonts/blob/6605ab1278b6bf92d5e47343efcf0d22bc50b2ec/src/gluonts/dataset/repository/_lstnet.py#L105
prediction_length = 24
context_length = prediction_length*2
batch_size = 128
num_batches_per_epoch = 100
epochs = 50
scaling = "std"
Transformer 模型都相對較小,具有
encoder_layers=2
decoder_layers=2
d_model=16
我們不再展示如何使用 Autoformer
訓練模型,而是可以直接用新的 Autoformer
模型替換之前的兩篇部落格文章(TimeSeriesTransformer 和 Informer)中的模型,並在 traffic
資料集上進行訓練。為了避免重複,我們已經訓練了模型並將其推送到 HuggingFace Hub。我們將使用這些模型進行評估。
載入資料集
我們首先安裝必要的庫
!pip install -q transformers datasets evaluate accelerate "gluonts[torch]" ujson tqdm
由 Lai 等人 (2017) 使用的 traffic
資料集包含舊金山交通資料。它包含 862 個小時時間序列,顯示 2015 年至 2016 年舊金山灣區高速公路上的道路佔用率,範圍在 之間。
from gluonts.dataset.repository.datasets import get_dataset
dataset = get_dataset("traffic")
freq = dataset.metadata.freq
prediction_length = dataset.metadata.prediction_length
我們來視覺化資料集中的一個時間序列,並繪製訓練/測試分割。
import matplotlib.pyplot as plt
train_example = next(iter(dataset.train))
test_example = next(iter(dataset.test))
num_of_samples = 4*prediction_length
figure, axes = plt.subplots()
axes.plot(train_example["target"][-num_of_samples:], color="blue")
axes.plot(
test_example["target"][-num_of_samples - prediction_length :],
color="red",
alpha=0.5,
)
plt.show()
我們來定義訓練/測試分割
train_dataset = dataset.train
test_dataset = dataset.test
定義轉換
接下來,我們定義資料的轉換,特別是用於建立時間特徵(基於資料集或通用特徵)的轉換。
我們定義了一個 GluonTS 的 Chain
轉換(有點類似於影像的 torchvision.transforms.Compose
)。它允許我們將多個轉換組合成一個單一的管道。
下面的轉換都帶有註釋,以解釋它們的功能。從高層次來看,我們將遍歷資料集中的各個時間序列並新增/刪除欄位或特徵。
from transformers import PretrainedConfig
from gluonts.time_feature import time_features_from_frequency_str
from gluonts.dataset.field_names import FieldName
from gluonts.transform import (
AddAgeFeature,
AddObservedValuesIndicator,
AddTimeFeatures,
AsNumpyArray,
Chain,
ExpectedNumInstanceSampler,
RemoveFields,
SelectFields,
SetField,
TestSplitSampler,
Transformation,
ValidationSplitSampler,
VstackFeatures,
RenameFields,
)
def create_transformation(freq: str, config: PretrainedConfig) -> Transformation:
# create a list of fields to remove later
remove_field_names = []
if config.num_static_real_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_REAL)
if config.num_dynamic_real_features == 0:
remove_field_names.append(FieldName.FEAT_DYNAMIC_REAL)
if config.num_static_categorical_features == 0:
remove_field_names.append(FieldName.FEAT_STATIC_CAT)
return Chain(
# step 1: remove static/dynamic fields if not specified
[RemoveFields(field_names=remove_field_names)]
# step 2: convert the data to NumPy (potentially not needed)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_CAT,
expected_ndim=1,
dtype=int,
)
]
if config.num_static_categorical_features > 0
else []
)
+ (
[
AsNumpyArray(
field=FieldName.FEAT_STATIC_REAL,
expected_ndim=1,
)
]
if config.num_static_real_features > 0
else []
)
+ [
AsNumpyArray(
field=FieldName.TARGET,
# we expect an extra dim for the multivariate case:
expected_ndim=1 if config.input_size == 1 else 2,
),
# step 3: handle the NaN's by filling in the target with zero
# and return the mask (which is in the observed values)
# true for observed values, false for nan's
# the decoder uses this mask (no loss is incurred for unobserved values)
# see loss_weights inside the xxxForPrediction model
AddObservedValuesIndicator(
target_field=FieldName.TARGET,
output_field=FieldName.OBSERVED_VALUES,
),
# step 4: add temporal features based on freq of the dataset
# these serve as positional encodings
AddTimeFeatures(
start_field=FieldName.START,
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_TIME,
time_features=time_features_from_frequency_str(freq),
pred_length=config.prediction_length,
),
# step 5: add another temporal feature (just a single number)
# tells the model where in the life the value of the time series is
# sort of running counter
AddAgeFeature(
target_field=FieldName.TARGET,
output_field=FieldName.FEAT_AGE,
pred_length=config.prediction_length,
log_scale=True,
),
# step 6: vertically stack all the temporal features into the key FEAT_TIME
VstackFeatures(
output_field=FieldName.FEAT_TIME,
input_fields=[FieldName.FEAT_TIME, FieldName.FEAT_AGE]
+ (
[FieldName.FEAT_DYNAMIC_REAL]
if config.num_dynamic_real_features > 0
else []
),
),
# step 7: rename to match HuggingFace names
RenameFields(
mapping={
FieldName.FEAT_STATIC_CAT: "static_categorical_features",
FieldName.FEAT_STATIC_REAL: "static_real_features",
FieldName.FEAT_TIME: "time_features",
FieldName.TARGET: "values",
FieldName.OBSERVED_VALUES: "observed_mask",
}
),
]
)
定義 InstanceSplitter
為了進行訓練/驗證/測試,我們接下來建立一個 InstanceSplitter
,它用於從資料集中取樣視窗(因為,請記住,由於時間和記憶體限制,我們無法將整個歷史值傳遞給模型)。
例項分割器從資料中取樣隨機的 context_length
大小和隨後的 prediction_length
大小的視窗,併為相應視窗中的任何時間序列鍵在 time_series_fields
中附加 past_
或 future_
字首。例項分割器可以配置為三種不同的模式:
mode="train"
:在這種模式下,我們從給定資料集(訓練資料集)中隨機取樣上下文和預測長度視窗。mode="validation"
:在這種模式下,我們從給定資料集(用於回溯測試或驗證似然計算)中取樣最後一個上下文長度視窗和預測視窗。mode="test"
:在這種模式下,我們僅取樣最後一個上下文長度視窗(用於預測用例)。
from gluonts.transform import InstanceSplitter
from gluonts.transform.sampler import InstanceSampler
from typing import Optional
def create_instance_splitter(
config: PretrainedConfig,
mode: str,
train_sampler: Optional[InstanceSampler] = None,
validation_sampler: Optional[InstanceSampler] = None,
) -> Transformation:
assert mode in ["train", "validation", "test"]
instance_sampler = {
"train": train_sampler
or ExpectedNumInstanceSampler(
num_instances=1.0, min_future=config.prediction_length
),
"validation": validation_sampler
or ValidationSplitSampler(min_future=config.prediction_length),
"test": TestSplitSampler(),
}[mode]
return InstanceSplitter(
target_field="values",
is_pad_field=FieldName.IS_PAD,
start_field=FieldName.START,
forecast_start_field=FieldName.FORECAST_START,
instance_sampler=instance_sampler,
past_length=config.context_length + max(config.lags_sequence),
future_length=config.prediction_length,
time_series_fields=["time_features", "observed_mask"],
)
建立 PyTorch DataLoaders
接下來,是時候建立 PyTorch DataLoaders 了,它們允許我們擁有批次的(輸入,輸出)對——換句話說,(past_values
,future_values
)。
from typing import Iterable
import torch
from gluonts.itertools import Cyclic, Cached
from gluonts.dataset.loader import as_stacked_batches
def create_train_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
num_batches_per_epoch: int,
shuffle_buffer_length: Optional[int] = None,
cache_data: bool = True,
**kwargs,
) -> Iterable:
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
TRAINING_INPUT_NAMES = PREDICTION_INPUT_NAMES + [
"future_values",
"future_observed_mask",
]
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=True)
if cache_data:
transformed_data = Cached(transformed_data)
# we initialize a Training instance
instance_splitter = create_instance_splitter(config, "train")
# the instance splitter will sample a window of
# context length + lags + prediction length (from the 366 possible transformed time series)
# randomly from within the target time series and return an iterator.
stream = Cyclic(transformed_data).stream()
training_instances = instance_splitter.apply(stream)
return as_stacked_batches(
training_instances,
batch_size=batch_size,
shuffle_buffer_length=shuffle_buffer_length,
field_names=TRAINING_INPUT_NAMES,
output_type=torch.tensor,
num_batches_per_epoch=num_batches_per_epoch,
)
def create_backtest_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data)
# we create a Validation Instance splitter which will sample the very last
# context window seen during training only for the encoder.
instance_sampler = create_instance_splitter(config, "validation")
# we apply the transformations in train mode
testing_instances = instance_sampler.apply(transformed_data, is_train=True)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
def create_test_dataloader(
config: PretrainedConfig,
freq,
data,
batch_size: int,
**kwargs,
):
PREDICTION_INPUT_NAMES = [
"past_time_features",
"past_values",
"past_observed_mask",
"future_time_features",
]
if config.num_static_categorical_features > 0:
PREDICTION_INPUT_NAMES.append("static_categorical_features")
if config.num_static_real_features > 0:
PREDICTION_INPUT_NAMES.append("static_real_features")
transformation = create_transformation(freq, config)
transformed_data = transformation.apply(data, is_train=False)
# We create a test Instance splitter to sample the very last
# context window from the dataset provided.
instance_sampler = create_instance_splitter(config, "test")
# We apply the transformations in test mode
testing_instances = instance_sampler.apply(transformed_data, is_train=False)
return as_stacked_batches(
testing_instances,
batch_size=batch_size,
output_type=torch.tensor,
field_names=PREDICTION_INPUT_NAMES,
)
在 Autoformer 上評估
我們已經在這個資料集上預訓練了一個 Autoformer 模型,所以我們只需獲取模型並在測試集上對其進行評估
from transformers import AutoformerConfig, AutoformerForPrediction
config = AutoformerConfig.from_pretrained("kashif/autoformer-traffic-hourly")
model = AutoformerForPrediction.from_pretrained("kashif/autoformer-traffic-hourly")
test_dataloader = create_backtest_dataloader(
config=config,
freq=freq,
data=test_dataset,
batch_size=64,
)
在推理時,我們將使用模型的 generate()
方法,從訓練集中每個時間序列的最後一個上下文視窗預測未來 prediction_length
步。
from accelerate import Accelerator
accelerator = Accelerator()
device = accelerator.device
model.to(device)
model.eval()
forecasts_ = []
for batch in test_dataloader:
outputs = model.generate(
static_categorical_features=batch["static_categorical_features"].to(device)
if config.num_static_categorical_features > 0
else None,
static_real_features=batch["static_real_features"].to(device)
if config.num_static_real_features > 0
else None,
past_time_features=batch["past_time_features"].to(device),
past_values=batch["past_values"].to(device),
future_time_features=batch["future_time_features"].to(device),
past_observed_mask=batch["past_observed_mask"].to(device),
)
forecasts_.append(outputs.sequences.cpu().numpy())
模型輸出的張量形狀為 (batch_size
, 樣本數
, 預測長度
, 輸入大小
)。
在這種情況下,我們為測試資料載入器批次中的每個時間序列(您還記得上面是 64
)的未來 24
小時獲得 100
個可能的值。
forecasts_[0].shape
>>> (64, 100, 24)
我們將它們垂直堆疊,以獲取測試資料集中所有時間序列的預測結果:測試集中有 7
個滾動視窗,這就是為什麼我們最終得到總計 7 * 862 = 6034
個預測。
import numpy as np
forecasts = np.vstack(forecasts_)
print(forecasts.shape)
>>> (6034, 100, 24)
我們可以根據測試集中存在的樣本外真實值評估由此產生的預測。為此,我們將使用 🤗 Evaluate 庫,其中包括 MASE 指標。
我們計算資料集中每個時間序列的指標並返回平均值
from tqdm.autonotebook import tqdm
from evaluate import load
from gluonts.time_feature import get_seasonality
mase_metric = load("evaluate-metric/mase")
forecast_median = np.median(forecasts, 1)
mase_metrics = []
for item_id, ts in enumerate(tqdm(test_dataset)):
training_data = ts["target"][:-prediction_length]
ground_truth = ts["target"][-prediction_length:]
mase = mase_metric.compute(
predictions=forecast_median[item_id],
references=np.array(ground_truth),
training=np.array(training_data),
periodicity=get_seasonality(freq))
mase_metrics.append(mase["mase"])
因此,Autoformer 模型的結果是
print(f"Autoformer univariate MASE: {np.mean(mase_metrics):.3f}")
>>> Autoformer univariate MASE: 0.910
為了繪製任何時間序列相對於真實測試資料的預測圖,我們定義了以下輔助函式
import matplotlib.dates as mdates
import pandas as pd
test_ds = list(test_dataset)
def plot(ts_index):
fig, ax = plt.subplots()
index = pd.period_range(
start=test_ds[ts_index][FieldName.START],
periods=len(test_ds[ts_index][FieldName.TARGET]),
freq=test_ds[ts_index][FieldName.START].freq,
).to_timestamp()
ax.plot(
index[-5*prediction_length:],
test_ds[ts_index]["target"][-5*prediction_length:],
label="actual",
)
plt.plot(
index[-prediction_length:],
np.median(forecasts[ts_index], axis=0),
label="median",
)
plt.gcf().autofmt_xdate()
plt.legend(loc="best")
plt.show()
例如,對於測試集中索引為 4
的時間序列
plot(4)
在 DLinear 上評估
機率 DLinear 在 gluonts
中實現,因此我們可以在這裡相對快速地對其進行訓練和評估
from gluonts.torch.model.d_linear.estimator import DLinearEstimator
# Define the DLinear model with the same parameters as the Autoformer model
estimator = DLinearEstimator(
prediction_length=dataset.metadata.prediction_length,
context_length=dataset.metadata.prediction_length*2,
scaling=scaling,
hidden_dimension=2,
batch_size=batch_size,
num_batches_per_epoch=num_batches_per_epoch,
trainer_kwargs=dict(max_epochs=epochs)
)
訓練模型
predictor = estimator.train(
training_data=train_dataset,
cache_data=True,
shuffle_buffer_length=1024
)
>>> INFO:pytorch_lightning.callbacks.model_summary:
| Name | Type | Params
---------------------------------------
0 | model | DLinearModel | 4.7 K
---------------------------------------
4.7 K Trainable params
0 Non-trainable params
4.7 K Total params
0.019 Total estimated model params size (MB)
Training: 0it [00:00, ?it/s]
...
INFO:pytorch_lightning.utilities.rank_zero:Epoch 49, global step 5000: 'train_loss' was not in top 1
INFO:pytorch_lightning.utilities.rank_zero:`Trainer.fit` stopped: `max_epochs=50` reached.
並在測試集上對其進行評估
from gluonts.evaluation import make_evaluation_predictions, Evaluator
forecast_it, ts_it = make_evaluation_predictions(
dataset=dataset.test,
predictor=predictor,
)
d_linear_forecasts = list(forecast_it)
d_linear_tss = list(ts_it)
evaluator = Evaluator()
agg_metrics, _ = evaluator(iter(d_linear_tss), iter(d_linear_forecasts))
因此,DLinear 模型的結果是
dlinear_mase = agg_metrics["MASE"]
print(f"DLinear MASE: {dlinear_mase:.3f}")
>>> DLinear MASE: 0.965
和以前一樣,我們透過此輔助函式繪製了我們訓練過的 DLinear 模型的預測圖
def plot_gluonts(index):
plt.plot(d_linear_tss[index][-4 * dataset.metadata.prediction_length:].to_timestamp(), label="target")
d_linear_forecasts[index].plot(show_label=True, color='g')
plt.legend()
plt.gcf().autofmt_xdate()
plt.show()
plot_gluonts(4)
traffic
資料集在工作日和週末之間的感測器模式存在分佈偏移。那麼這裡發生了什麼?由於 DLinear 模型無法整合協變數,特別是任何日期時間特徵,我們給它的上下文視窗沒有足夠的資訊來判斷預測是針對週末還是工作日。因此,模型將預測更常見的模式,即工作日,從而導致週末效能較差。當然,透過提供更大的上下文視窗,線性模型將能夠找出每週模式,但資料中可能存在每月或每季度模式,這將需要越來越大的上下文。
結論
基於 Transformer 的模型與上述線性基線相比如何?我們不同模型的測試集 MASE 指標如下:
資料集 | Transformer(單變數) | Transformer(多變數) | Informer(單變數) | Informer(多變數) | Autoformer(單變數) | DLinear |
---|---|---|---|---|---|---|
交通 |
0.876 | 1.046 | 0.924 | 1.131 | 0.910 | 0.965 |
正如我們所觀察到的,我們去年推出的普通Transformer在這裡獲得了最佳結果。其次,多變數模型通常差於單變數模型,原因在於難以估計跨序列相關性/關係。估計值帶來的額外方差通常會損害最終的預測結果,或者模型會學習到虛假相關性。最近的論文,如CrossFormer(ICLR 23)和CARD,試圖解決Transformer模型中的這一問題。多變數模型通常在大量資料上訓練時表現良好。然而,與單變數模型相比,特別是在較小的開放資料集上,單變數模型往往能提供更好的指標。透過將線性模型與同等大小的單變數Transformer或其他任何神經網路單變數模型進行比較,通常會獲得更好的效能。
總而言之,Transformer 模型在時間序列預測方面絕對遠未過時!然而,大規模資料集的可用性對於最大限度地發揮其潛力至關重要。與計算機視覺和自然語言處理領域不同,時間序列領域缺乏可公開訪問的大規模資料集。大多數現有時間序列預訓練模型都在像 UCR 和 UEA 這樣的檔案中進行小樣本訓練,這些檔案只包含幾千甚至幾百個樣本。儘管這些基準資料集在時間序列社群的進步中發揮了重要作用,但其有限的樣本量和缺乏通用性對深度學習模型的預訓練構成了挑戰。
因此,開發大規模、通用時間序列資料集(如計算機視覺中的 ImageNet)至關重要。建立此類資料集將極大地促進專門為時間序列分析設計的預訓練模型的進一步研究,並提高預訓練模型在時間序列預測中的適用性。
致謝
我們感謝 Lysandre Debut 和 Pedro Cuenca 在本專案中提供的富有洞察力的評論和幫助 ❤️。