訓練你的第一個決策Transformer

釋出於 2022 年 9 月 8 日
在 GitHub 上更新

上一篇文章中,我們宣佈在 transformers 庫中推出決策Transformer。這種**使用 Transformer 作為決策模型**的新技術越來越受歡迎。

所以今天,**你將學習如何從零開始訓練你的第一個離線決策Transformer模型,讓半獵豹奔跑起來。**我們將在 Google Colab 上直接訓練它,你可以在這裡找到它 👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb

*一個“專家”決策Transformer模型,使用離線強化學習在 Gym HalfCheetah 環境中學習。*

聽起來很刺激吧?我們開始吧!

什麼是決策Transformer?

決策Transformer模型由**Chen L. 等人撰寫的《決策Transformer:透過序列建模進行強化學習》**引入。它將強化學習抽象為**條件序列建模問題**。

主要思想是,我們不使用強化學習方法訓練策略,例如擬合一個價值函式來告訴我們採取什麼行動來最大化回報(累積獎勵),而是**使用序列建模演算法(Transformer)**,該演算法在給定期望回報、過去狀態和行動的情況下,將生成未來行動以實現此期望回報。它是一個自迴歸模型,以期望回報、過去狀態和行動為條件,生成實現期望回報的未來行動。

**這是強化學習正規化的徹底轉變**,因為我們使用生成軌跡建模(建模狀態、動作和獎勵序列的聯合分佈)來取代傳統的強化學習演算法。這意味著在決策Transformer中,我們不最大化回報,而是生成一系列未來動作以實現期望回報。

過程如下:

  1. 我們將**最後 K 個時間步**與三個輸入一起送入決策Transformer:
    • 剩餘回報(Return-to-go)
    • 狀態
    • 行動
  2. 如果狀態是向量,**令牌將透過線性層嵌入**;如果是幀,則透過 CNN 編碼器嵌入。
  3. **輸入由 GPT-2 模型處理**,該模型透過自迴歸建模預測未來行動。

https://huggingface.co/blog/assets/58_decision-transformers/dt-architecture.gif

決策Transformer架構。狀態、動作和回報被送入特定模態的線性嵌入層,並添加了位置情景時間步編碼。令牌被送入 GPT 架構,該架構使用因果自注意力掩碼自迴歸地預測動作。圖表來自 [1]。

有不同型別的決策Transformer,但今天,我們將訓練一個離線決策Transformer,這意味著我們只使用從其他代理或人類演示中收集的資料。**代理不與環境互動**。如果你想了解更多關於離線和線上強化學習之間的區別,請檢視這篇文章

現在我們已經理解了離線決策Transformer背後的理論,**讓我們看看如何在實踐中訓練一個。**

訓練決策Transformer

在上一篇文章中,我們演示瞭如何使用 transformers 決策Transformer模型並從 🤗 hub 載入預訓練權重。

在這一部分,我們將使用 🤗 Trainer 和自定義資料整理器從頭開始訓練決策Transformer模型,使用託管在 🤗 hub 上的離線強化學習資料集。你可以在此 Colab notebook中找到本教程的程式碼。

我們將執行離線強化學習,以學習 mujoco halfcheetah 環境中的以下行為。

*一個“專家”決策Transformer模型,使用離線強化學習在 Gym HalfCheetah 環境中學習。*

載入資料集和構建自定義資料整理器

我們在 hub 上託管了許多離線強化學習資料集。今天我們將使用 hub 上託管的 halfcheetah “專家”資料集進行訓練。

首先,我們需要從 🤗 datasets 包中匯入 `load_dataset` 函式,並將資料集下載到我們的機器上。

from datasets import load_dataset
dataset = load_dataset("edbeeching/decision_transformer_gym_replay", "halfcheetah-expert-v2")

雖然 hub 上的大多數資料集都可以直接使用,但有時我們希望對資料集進行一些額外的處理或修改。在這種情況下,我們希望與作者的實現相匹配,即我們需要:

  • 透過減去均值併除以標準差來歸一化每個特徵。
  • 為每個軌跡預計算折扣回報。
  • 將獎勵和回報按 1000 的係數進行縮放。
  • 擴充資料集取樣分佈,使其考慮專家代理軌跡的長度。

為了執行此資料集預處理,我們將使用自定義 🤗 Data Collator

現在,讓我們開始為離線強化學習構建自定義資料整理器。

@dataclass
class DecisionTransformerGymDataCollator:
    return_tensors: str = "pt"
    max_len: int = 20 #subsets of the episode we use for training
    state_dim: int = 17  # size of state space
    act_dim: int = 6  # size of action space
    max_ep_len: int = 1000 # max episode length in the dataset
    scale: float = 1000.0  # normalization of rewards/returns
    state_mean: np.array = None  # to store state means
    state_std: np.array = None  # to store state stds
    p_sample: np.array = None  # a distribution to take account trajectory lengths
    n_traj: int = 0 # to store the number of trajectories in the dataset

    def __init__(self, dataset) -> None:
        self.act_dim = len(dataset[0]["actions"][0])
        self.state_dim = len(dataset[0]["observations"][0])
        self.dataset = dataset
        # calculate dataset stats for normalization of states
        states = []
        traj_lens = []
        for obs in dataset["observations"]:
            states.extend(obs)
            traj_lens.append(len(obs))
        self.n_traj = len(traj_lens)
        states = np.vstack(states)
        self.state_mean, self.state_std = np.mean(states, axis=0), np.std(states, axis=0) + 1e-6
        
        traj_lens = np.array(traj_lens)
        self.p_sample = traj_lens / sum(traj_lens)

    def _discount_cumsum(self, x, gamma):
        discount_cumsum = np.zeros_like(x)
        discount_cumsum[-1] = x[-1]
        for t in reversed(range(x.shape[0] - 1)):
            discount_cumsum[t] = x[t] + gamma * discount_cumsum[t + 1]
        return discount_cumsum

    def __call__(self, features):
        batch_size = len(features)
        # this is a bit of a hack to be able to sample of a non-uniform distribution
        batch_inds = np.random.choice(
            np.arange(self.n_traj),
            size=batch_size,
            replace=True,
            p=self.p_sample,  # reweights so we sample according to timesteps
        )
        # a batch of dataset features
        s, a, r, d, rtg, timesteps, mask = [], [], [], [], [], [], []
        
        for ind in batch_inds:
            # for feature in features:
            feature = self.dataset[int(ind)]
            si = random.randint(0, len(feature["rewards"]) - 1)

            # get sequences from dataset
            s.append(np.array(feature["observations"][si : si + self.max_len]).reshape(1, -1, self.state_dim))
            a.append(np.array(feature["actions"][si : si + self.max_len]).reshape(1, -1, self.act_dim))
            r.append(np.array(feature["rewards"][si : si + self.max_len]).reshape(1, -1, 1))

            d.append(np.array(feature["dones"][si : si + self.max_len]).reshape(1, -1))
            timesteps.append(np.arange(si, si + s[-1].shape[1]).reshape(1, -1))
            timesteps[-1][timesteps[-1] >= self.max_ep_len] = self.max_ep_len - 1  # padding cutoff
            rtg.append(
                self._discount_cumsum(np.array(feature["rewards"][si:]), gamma=1.0)[
                    : s[-1].shape[1]   # TODO check the +1 removed here
                ].reshape(1, -1, 1)
            )
            if rtg[-1].shape[1] < s[-1].shape[1]:
                print("if true")
                rtg[-1] = np.concatenate([rtg[-1], np.zeros((1, 1, 1))], axis=1)

            # padding and state + reward normalization
            tlen = s[-1].shape[1]
            s[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, self.state_dim)), s[-1]], axis=1)
            s[-1] = (s[-1] - self.state_mean) / self.state_std
            a[-1] = np.concatenate(
                [np.ones((1, self.max_len - tlen, self.act_dim)) * -10.0, a[-1]],
                axis=1,
            )
            r[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), r[-1]], axis=1)
            d[-1] = np.concatenate([np.ones((1, self.max_len - tlen)) * 2, d[-1]], axis=1)
            rtg[-1] = np.concatenate([np.zeros((1, self.max_len - tlen, 1)), rtg[-1]], axis=1) / self.scale
            timesteps[-1] = np.concatenate([np.zeros((1, self.max_len - tlen)), timesteps[-1]], axis=1)
            mask.append(np.concatenate([np.zeros((1, self.max_len - tlen)), np.ones((1, tlen))], axis=1))

        s = torch.from_numpy(np.concatenate(s, axis=0)).float()
        a = torch.from_numpy(np.concatenate(a, axis=0)).float()
        r = torch.from_numpy(np.concatenate(r, axis=0)).float()
        d = torch.from_numpy(np.concatenate(d, axis=0))
        rtg = torch.from_numpy(np.concatenate(rtg, axis=0)).float()
        timesteps = torch.from_numpy(np.concatenate(timesteps, axis=0)).long()
        mask = torch.from_numpy(np.concatenate(mask, axis=0)).float()

        return {
            "states": s,
            "actions": a,
            "rewards": r,
            "returns_to_go": rtg,
            "timesteps": timesteps,
            "attention_mask": mask,
        }

程式碼很多,簡單來說,我們定義了一個類,它接收我們的資料集,執行所需的預處理,並返回給我們批次的**狀態**、**動作**、**獎勵**、**回報**、**時間步**和**掩碼**。這些批次可以直接用於使用 🤗 transformers Trainer 訓練決策Transformer模型。

使用 🤗 transformers Trainer 訓練決策Transformer模型。

為了使用 🤗 Trainer 類訓練模型,我們首先需要確保它返回的字典包含損失,在本例中是模型動作預測和目標之間的 L-2 範數。我們透過建立一個繼承自決策Transformer模型的 TrainableDT 類來實現這一點。

class TrainableDT(DecisionTransformerModel):
    def __init__(self, config):
        super().__init__(config)

    def forward(self, **kwargs):
        output = super().forward(**kwargs)
        # add the DT loss
        action_preds = output[1]
        action_targets = kwargs["actions"]
        attention_mask = kwargs["attention_mask"]
        act_dim = action_preds.shape[2]
        action_preds = action_preds.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
        action_targets = action_targets.reshape(-1, act_dim)[attention_mask.reshape(-1) > 0]
        
        loss = torch.mean((action_preds - action_targets) ** 2)

        return {"loss": loss}

    def original_forward(self, **kwargs):
        return super().forward(**kwargs)

transformers Trainer 類需要許多引數,這些引數在 TrainingArguments 類中定義。我們使用與作者原始實現相同的超引數,但訓練迭代次數較少。這在 Colab notebook 中訓練大約需要 40 分鐘,所以你可以泡杯咖啡或者閱讀 🤗 Annotated Diffusion 部落格文章,等待期間。作者訓練了大約 3 小時,所以我們這裡得到的結果不會像他們的那麼好。

training_args = TrainingArguments(
    output_dir="output/",
    remove_unused_columns=False,
    num_train_epochs=120,
    per_device_train_batch_size=64,
    learning_rate=1e-4,
    weight_decay=1e-4,
    warmup_ratio=0.1,
    optim="adamw_torch",
    max_grad_norm=0.25,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset["train"],
    data_collator=collator,
)

trainer.train()

現在我們已經解釋了決策Transformer的理論,Trainer,以及如何訓練它。**你已經準備好從零開始訓練你的第一個離線決策Transformer模型,讓半獵豹奔跑起來** 👉 https://github.com/huggingface/blog/blob/main/notebooks/101_train-decision-transformers.ipynb Colab 中包含了訓練模型的視覺化,以及如何將模型儲存到 🤗 hub。

結論

這篇文章展示瞭如何在託管於 🤗 資料集上的離線強化學習資料集上訓練決策Transformer。我們使用了 🤗 transformers Trainer 和自定義資料整理器。

除了決策Transformer,**我們還希望支援深度強化學習社群的更多用例和工具**。因此,我們非常期待聽到您對決策Transformer模型的反饋,以及更普遍地,我們可以與您一起構建的任何對強化學習有用的工具。請隨時**與我們聯絡**。

接下來是什麼?

在接下來的幾周和幾個月裡,**我們計劃支援生態系統中的其他工具**

  • 擴充套件我們的決策Transformer模型庫,包括線上設定中訓練或微調的模型 [2]
  • 整合 sample-factory 版本 2.0

保持聯絡的最佳方式是**加入我們的 discord 伺服器**,與我們和社群交流。

參考文獻

[1] Chen, Lili, et al. "決策Transformer:透過序列建模進行強化學習。" 神經資訊處理系統進展 34 (2021)。

[2] Zheng, Qinqing 和 Zhang, Amy 和 Grover, Aditya “線上決策Transformer” (arXiv 預印本, 2022)

社群

幾條評論/問題

  • `print("if true")` 語句可能是遺留程式碼
  • 從整理器返回的許多資料未使用,為什麼保留它?

有什麼辦法可以將此環境遷移到mujoco v4嗎?我在讓gym/gymnasium與mujoco或mujoco_py良好配合方面遇到困難。即使我正確安裝了它,它也一直說模組未找到。

註冊登入發表評論

© . This site is unofficial and not affiliated with Hugging Face, Inc.