深度強化學習課程文件

實戰

Hugging Face's logo
加入 Hugging Face 社群

並獲得增強的文件體驗

開始使用

實戰

Ask a Question Open In Colab

現在我們已經學習了 Reinforce 的理論,你準備好用 PyTorch 編寫你的 Reinforce 代理了。你將使用 CartPole-v1 和 PixelCopter 測試它的魯棒性。

然後你將能夠迭代和改進這個實現,以適應更高階的環境。

Environments

要驗證此實戰是否符合認證流程,你需要將訓練好的模型上傳到 Hub,並且:

  • Cartpole-v1 中獲得 >= 350 的結果
  • PixelCopter 中獲得 >= 5 的結果。

要檢視你的結果,請前往排行榜並找到你的模型,結果 = 平均獎勵 - 獎勵標準差如果你在排行榜上沒有看到你的模型,請前往排行榜頁面底部並點選重新整理按鈕

如果你沒有找到你的模型,請滾動到頁面底部並點選重新整理按鈕。

有關認證流程的更多資訊,請檢視此部分 👉 https://huggingface.co/deep-rl-course/en/unit0/introduction#certification-process

你可以在這裡檢視你的進度 👉 https://huggingface.co/spaces/ThomasSimonini/Check-my-progress-Deep-RL-Course

要開始實戰,請點選“在 Colab 中開啟”按鈕 👇

Open In Colab

我們強烈**建議學生使用 Google Colab 進行實踐練習**,而不是在個人電腦上執行。

使用 Google Colab,**您可以專注於學習和實驗,而無需擔心環境設定的技術問題。**

單元 4:使用 PyTorch 編寫你的第一個深度強化學習演算法:Reinforce。並測試其魯棒性 💪

thumbnail

在本筆記中,你將從頭開始編寫你的第一個深度強化學習演算法:Reinforce(也稱為蒙特卡洛策略梯度)。

Reinforce 是一種**基於策略的方法**:一種深度強化學習演算法,它嘗試**直接最佳化策略而無需使用行動值函式**。

更精確地說,Reinforce 是一種**策略梯度方法**,是**基於策略的方法**的一個子類,旨在**透過使用梯度上升估計最優策略的權重來直接最佳化策略**。

為了測試其魯棒性,我們將在兩個不同的簡單環境中對其進行訓練:

  • Cartpole-v1
  • PixelcopterEnv

⬇️ 這是您在本筆記本結束時將實現的效果示例。⬇️

Environments

🎮 環境:

📚 RL 庫:

  • Python
  • PyTorch

我們正在不斷努力改進我們的教程,因此,**如果您在本筆記本中發現任何問題**,請在 GitHub 倉庫上提出問題

本筆記本的目標 🏆

在本筆記本結束時,您將:

  • 能夠**使用 PyTorch 從頭開始編寫 Reinforce 演算法。**
  • 能夠**使用簡單環境測試你的代理的魯棒性。**
  • 能夠**將你訓練好的代理與精彩的影片回放和評估分數一起推送到 Hub** 🔥。

先決條件 🏗️

在深入學習本筆記本之前,您需要:

🔲 📚 透過閱讀第 4 單元學習策略梯度

讓我們從頭開始編寫 Reinforce 演算法 🔥

一些建議 💡

最好將此 Colab 執行在你的 Google Drive 副本中,這樣**即使超時**,你的 Google Drive 上仍然儲存了筆記本,無需從頭開始填寫所有內容。

要做到這一點,你可以按 Ctrl + S 或選擇 檔案 > 在 Google Drive 中儲存副本

設定 GPU 💪

  • 為了加速智慧體的訓練,我們將使用 GPU。為此,請轉到 Runtime > Change Runtime type
GPU Step 1
  • 硬體加速器 > GPU
GPU Step 2

建立虛擬顯示器 🖥

在筆記本中,我們需要生成一個重播影片。為此,在 Colab 中,**我們需要一個虛擬螢幕才能渲染環境**(從而錄製幀)。

以下單元格將安裝庫並建立和執行虛擬螢幕 🖥

%%capture
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install pyvirtualdisplay
!pip install pyglet==1.5.1
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

安裝依賴項 🔽

第一步是安裝依賴項。我們將安裝多個依賴項:

  • gym
  • gym-games:使用 PyGame 製作的額外 gym 環境。
  • huggingface_hub:Hub 作為一箇中心平臺,任何人都可以共享和探索模型和資料集。它具有版本控制、度量、視覺化和其他功能,可以讓你輕鬆地與他人協作。

你可能想知道為什麼我們安裝的是 gym 而不是它的更新版本 gymnasium?**因為我們正在使用的 gym-games 尚未更新到 gymnasium**。

您將在此處遇到的差異:

  • gym 中,我們沒有 terminatedtruncated,只有 done
  • gym 中,使用 env.step() 返回 state, reward, done, info

您可以在此處瞭解更多關於 Gym 和 Gymnasium 之間的差異 👉 https://gymnasium.llms.tw/content/migration-guide/

你可以在這裡看到所有可用的 Reinforce 模型 👉 https://huggingface.co/models?other=reinforce

您可以在這裡找到所有深度強化學習模型 👉 https://huggingface.co/models?pipeline_tag=reinforcement-learning

!pip install -r https://raw.githubusercontent.com/huggingface/deep-rl-class/main/notebooks/unit4/requirements-unit4.txt

匯入包 📦

除了匯入已安裝的庫,我們還匯入:

  • imageio:一個幫助我們生成重放影片的庫。
import numpy as np

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Gym
import gym
import gym_pygame

# Hugging Face Hub
from huggingface_hub import notebook_login # To log to our Hugging Face account to be able to upload models to the Hub.
import imageio

檢查我們是否有 GPU

  • 讓我們檢查一下我們是否有 GPU。
  • 如果是這樣,您應該會看到 device:cuda0
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

我們現在準備實現我們的 Reinforce 演算法 🔥

第一個代理:玩 CartPole-v1 🤖

建立 CartPole 環境並瞭解其工作原理

環境 🎮

為什麼我們使用 CartPole-v1 這樣的簡單環境?

正如強化學習技巧和訣竅中所解釋的,當你從頭開始實現你的代理時,你需要**確保它在簡單環境中正確工作並找到錯誤,然後再深入**,因為在簡單環境中找到錯誤會容易得多。

嘗試在玩具問題上獲得一些“生命跡象”。

透過使其在越來越困難的環境中執行來驗證實現(您可以將結果與 RL zoo 進行比較)。通常,您需要為該步驟執行超引數最佳化。

CartPole-v1 環境

一根杆子透過一個未驅動的關節連線到一輛小車上,小車在無摩擦的軌道上移動。擺錘垂直放置在小車上,目標是透過在小車上施加向左和向右的力來平衡杆子。

所以,我們從 CartPole-v1 開始。目標是向左或向右推動推車,**使杆子保持平衡。**

如果發生以下情況,回合結束:

  • 杆子角度大於 ±12°
  • 推車位置大於 ±2.4
  • 回合長度大於 500

杆子每保持平衡一個時間步,我們就會獲得 💰 +1 的獎勵。

env_id = "CartPole-v1"
# Create the env
env = gym.make(env_id)

# Create the evaluation env
eval_env = gym.make(env_id)

# Get the state space and action space
s_size = env.observation_space.shape[0]
a_size = env.action_space.n
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample())  # Get a random observation
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample())  # Take a random action

讓我們構建 Reinforce 架構

此實現基於三個實現:

Reinforce

所以我們希望:

  • 兩個全連線層(fc1 和 fc2)。
  • 將 ReLU 用作 fc1 的啟用函式
  • 使用 Softmax 輸出動作的機率分佈
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        # Create two fully connected layers



    def forward(self, x):
        # Define the forward pass
        # state goes to fc1 then we apply ReLU activation function

        # fc1 outputs goes to fc2

        # We output the softmax

    def act(self, state):
        """
        Given a state, take action
        """
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = np.argmax(m)
        return action.item(), m.log_prob(action)

解決方案

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = np.argmax(m)
        return action.item(), m.log_prob(action)

我犯了一個錯誤,你能猜到在哪裡嗎?

  • 要找到答案,讓我們進行一次前向傳播。
debug_policy = Policy(s_size, a_size, 64).to(device)
debug_policy.act(env.reset())
  • 這裡我們看到錯誤提示 ValueError: The value argument to log_prob must be a Tensor

  • 這意味著 m.log_prob(action) 中的 action 必須是一個 Tensor,**但它不是。**

  • 你知道為什麼嗎?檢查一下 act 函式,看看它為什麼不起作用。

建議 💡:此實現中存在問題。請記住,對於 act 函式,**我們希望從動作的機率分佈中取樣一個動作**。

(真正的)解決方案

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

透過使用 CartPole,除錯更容易,因為**我們知道錯誤來自我們的整合,而不是我們的簡單環境**。

  • 由於**我們希望從動作的機率分佈中取樣一個動作**,因此我們不能使用 action = np.argmax(m),因為它總是輸出機率最高的動作。

  • 我們需要將其替換為 action = m.sample(),這將從機率分佈 P(.|s) 中取樣一個動作。

讓我們構建 Reinforce 訓練演算法

這是 Reinforce 演算法的虛擬碼

Policy gradient pseudocode
  • 當我們計算返回 Gt(第 6 行)時,我們看到我們計算的是**從時間步 t 開始**的折扣獎勵之和。

  • 為什麼?因為我們的策略應該只**根據結果來強化動作**:所以動作之前獲得的獎勵是無用的(因為它們不是由動作引起的),**只有動作之後獲得的獎勵才重要**。

  • 在編寫程式碼之前,您應該閱讀此部分 不要讓過去分散您的注意力,它解釋了為什麼我們使用“未來獎勵策略梯度”。

我們使用 Chris1nexus 編寫的有趣技術來**高效地計算每個時間步的返回**。註釋解釋了該過程。也請不要猶豫 檢視 PR 解釋。但總的來說,其思想是**高效地計算每個時間步的返回**。

您可能會問的第二個問題是**我們為什麼要最小化損失**?我們之前不是在談論梯度上升而不是梯度下降嗎?

  • 我們希望最大化我們的效用函式 $J(\theta)$,但在 PyTorch 和 TensorFlow 中,最好**最小化目標函式。**
    • 假設我們想在某個時間步強化動作 3。在訓練之前,此動作 P 為 0.25。
    • 所以我們想要修改thetatheta 這樣πθ(a3s;θ)>0.25\pi_\theta(a_3|s; \theta) > 0.25
    • 由於所有 P 的和必須為 1,最大化piθ(a3s;θ)pi_\theta(a_3|s; \theta)將**最小化其他動作的機率。**
    • 所以我們應該告訴 PyTorch **最小化1πθ(a3s;θ)1 - \pi_\theta(a_3|s; \theta)
    • 當損失函式趨近於 0 時,πθ(a3s;θ)\pi_\theta(a_3|s; \theta)接近 1。
    • 所以我們鼓勵梯度最大化πθ(a3s;θ)\pi_\theta(a_3|s; \theta)
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100)
    scores = []
    # Line 3 of pseudocode
    for i_episode in range(1, n_training_episodes+1):
        saved_log_probs = []
        rewards = []
        state = # TODO: reset the environment
        # Line 4 of pseudocode
        for t in range(max_t):
            action, log_prob = # TODO get the action
            saved_log_probs.append(log_prob)
            state, reward, done, _ = # TODO: take an env step
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        # Line 6 of pseudocode: calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)
        # Compute the discounted returns at each timestep,
        # as the sum of the gamma-discounted return at time t (G_t) + the reward at time t

        # In O(N) time, where N is the number of time steps
        # (this definition of the discounted return G_t follows the definition of this quantity
        # shown at page 44 of Sutton&Barto 2017 2nd draft)
        # G_t = r_(t+1) + r_(t+2) + ...

        # Given this formulation, the returns at each timestep t can be computed
        # by re-using the computed future returns G_(t+1) to compute the current return G_t
        # G_t = r_(t+1) + gamma*G_(t+1)
        # G_(t-1) = r_t + gamma* G_t
        # (this follows a dynamic programming approach, with which we memorize solutions in order
        # to avoid computing them multiple times)

        # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft)
        # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ...


        ## Given the above, we calculate the returns at timestep t as:
        #               gamma[t] * return[t] + reward[t]
        #
        ## We compute this starting from the last timestep to the first, in order
        ## to employ the formula presented above and avoid redundant computations that would be needed
        ## if we were to do it from first to last.

        ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps
        ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1)
        ## a normal python list would instead require O(N) to do this.
        for t in range(n_steps)[::-1]:
            disc_return_t = (returns[0] if len(returns)>0 else 0)
            returns.appendleft(    ) # TODO: complete here

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()

        ## eps is the smallest representable float, which is
        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Line 7:
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Line 8: PyTorch prefers gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))

    return scores

解決方案

def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100)
    scores = []
    # Line 3 of pseudocode
    for i_episode in range(1, n_training_episodes + 1):
        saved_log_probs = []
        rewards = []
        state = env.reset()
        # Line 4 of pseudocode
        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            state, reward, done, _ = env.step(action)
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        # Line 6 of pseudocode: calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)
        # Compute the discounted returns at each timestep,
        # as
        #      the sum of the gamma-discounted return at time t (G_t) + the reward at time t
        #
        # In O(N) time, where N is the number of time steps
        # (this definition of the discounted return G_t follows the definition of this quantity
        # shown at page 44 of Sutton&Barto 2017 2nd draft)
        # G_t = r_(t+1) + r_(t+2) + ...

        # Given this formulation, the returns at each timestep t can be computed
        # by re-using the computed future returns G_(t+1) to compute the current return G_t
        # G_t = r_(t+1) + gamma*G_(t+1)
        # G_(t-1) = r_t + gamma* G_t
        # (this follows a dynamic programming approach, with which we memorize solutions in order
        # to avoid computing them multiple times)

        # This is correct since the above is equivalent to (see also page 46 of Sutton&Barto 2017 2nd draft)
        # G_(t-1) = r_t + gamma*r_(t+1) + gamma*gamma*r_(t+2) + ...

        ## Given the above, we calculate the returns at timestep t as:
        #               gamma[t] * return[t] + reward[t]
        #
        ## We compute this starting from the last timestep to the first, in order
        ## to employ the formula presented above and avoid redundant computations that would be needed
        ## if we were to do it from first to last.

        ## Hence, the queue "returns" will hold the returns in chronological order, from t=0 to t=n_steps
        ## thanks to the appendleft() function which allows to append to the position 0 in constant time O(1)
        ## a normal python list would instead require O(N) to do this.
        for t in range(n_steps)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()
        ## eps is the smallest representable float, which is
        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Line 7:
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Line 8: PyTorch prefers gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print("Episode {}\tAverage Score: {:.2f}".format(i_episode, np.mean(scores_deque)))

    return scores

訓練它

  • 我們現在準備訓練我們的代理。
  • 但首先,我們定義一個包含所有訓練超引數的變數。
  • 您可以更改訓練引數(並且應該更改 😉)。
cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes": 1000,
    "n_evaluation_episodes": 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr": 1e-2,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}
# Create policy and place it to the device
cartpole_policy = Policy(
    cartpole_hyperparameters["state_space"],
    cartpole_hyperparameters["action_space"],
    cartpole_hyperparameters["h_size"],
).to(device)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])
scores = reinforce(
    cartpole_policy,
    cartpole_optimizer,
    cartpole_hyperparameters["n_training_episodes"],
    cartpole_hyperparameters["max_t"],
    cartpole_hyperparameters["gamma"],
    100,
)

定義評估方法 📝

  • 這裡我們定義了我們將用於測試 Reinforce 代理的評估方法。
def evaluate_agent(env, max_steps, n_eval_episodes, policy):
    """
    Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
    :param env: The evaluation environment
    :param n_eval_episodes: Number of episode to evaluate the agent
    :param policy: The Reinforce agent
    """
    episode_rewards = []
    for episode in range(n_eval_episodes):
        state = env.reset()
        step = 0
        done = False
        total_rewards_ep = 0

        for step in range(max_steps):
            action, _ = policy.act(state)
            new_state, reward, done, info = env.step(action)
            total_rewards_ep += reward

            if done:
                break
            state = new_state
        episode_rewards.append(total_rewards_ep)
    mean_reward = np.mean(episode_rewards)
    std_reward = np.std(episode_rewards)

    return mean_reward, std_reward

評估我們的代理 📈

evaluate_agent(
    eval_env, cartpole_hyperparameters["max_t"], cartpole_hyperparameters["n_evaluation_episodes"], cartpole_policy
)

在 Hub 上釋出我們訓練好的模型 🔥

既然我們已經看到訓練後取得了良好的結果,我們可以用一行程式碼將我們訓練好的模型釋出到 hub 🤗。

這是一個模型卡片的例子。

推送到 Hub

請勿修改此程式碼

from huggingface_hub import HfApi, snapshot_download
from huggingface_hub.repocard import metadata_eval_result, metadata_save

from pathlib import Path
import datetime
import json
import imageio

import tempfile

import os
def record_video(env, policy, out_directory, fps=30):
    """
    Generate a replay video of the agent
    :param env
    :param Qtable: Qtable of our agent
    :param out_directory
    :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
    """
    images = []
    done = False
    state = env.reset()
    img = env.render(mode="rgb_array")
    images.append(img)
    while not done:
        # Take the action (index) that have the maximum expected future reward given that state
        action, _ = policy.act(state)
        state, reward, done, info = env.step(action)  # We directly put next_state = state for recording logic
        img = env.render(mode="rgb_array")
        images.append(img)
    imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)
def push_to_hub(repo_id,
                model,
                hyperparameters,
                eval_env,
                video_fps=30
                ):
  """
  Evaluate, Generate a video and Upload a model to Hugging Face Hub.
  This method does the complete pipeline:
  - It evaluates the model
  - It generates the model card
  - It generates a replay video of the agent
  - It pushes everything to the Hub

  :param repo_id: repo_id: id of the model repository from the Hugging Face Hub
  :param model: the pytorch model we want to save
  :param hyperparameters: training hyperparameters
  :param eval_env: evaluation environment
  :param video_fps: how many frame per seconds to record our video replay
  """

  _, repo_name = repo_id.split("/")
  api = HfApi()

  # Step 1: Create the repo
  repo_url = api.create_repo(
        repo_id=repo_id,
        exist_ok=True,
  )

  with tempfile.TemporaryDirectory() as tmpdirname:
    local_directory = Path(tmpdirname)

    # Step 2: Save the model
    torch.save(model, local_directory / "model.pt")

    # Step 3: Save the hyperparameters to JSON
    with open(local_directory / "hyperparameters.json", "w") as outfile:
      json.dump(hyperparameters, outfile)

    # Step 4: Evaluate the model and build JSON
    mean_reward, std_reward = evaluate_agent(eval_env,
                                            hyperparameters["max_t"],
                                            hyperparameters["n_evaluation_episodes"],
                                            model)
    # Get datetime
    eval_datetime = datetime.datetime.now()
    eval_form_datetime = eval_datetime.isoformat()

    evaluate_data = {
          "env_id": hyperparameters["env_id"],
          "mean_reward": mean_reward,
          "n_evaluation_episodes": hyperparameters["n_evaluation_episodes"],
          "eval_datetime": eval_form_datetime,
    }

    # Write a JSON file
    with open(local_directory / "results.json", "w") as outfile:
        json.dump(evaluate_data, outfile)

    # Step 5: Create the model card
    env_name = hyperparameters["env_id"]

    metadata = {}
    metadata["tags"] = [
          env_name,
          "reinforce",
          "reinforcement-learning",
          "custom-implementation",
          "deep-rl-class"
      ]

    # Add metrics
    eval = metadata_eval_result(
        model_pretty_name=repo_name,
        task_pretty_name="reinforcement-learning",
        task_id="reinforcement-learning",
        metrics_pretty_name="mean_reward",
        metrics_id="mean_reward",
        metrics_value=f"{mean_reward:.2f} +/- {std_reward:.2f}",
        dataset_pretty_name=env_name,
        dataset_id=env_name,
      )

    # Merges both dictionaries
    metadata = {**metadata, **eval}

    model_card = f"""
  # **Reinforce** Agent playing **{env_id}**
  This is a trained model of a **Reinforce** agent playing **{env_id}** .
  To learn to use this model and train yours check Unit 4 of the Deep Reinforcement Learning Course: https://huggingface.co/deep-rl-course/unit4/introduction
  """

    readme_path = local_directory / "README.md"
    readme = ""
    if readme_path.exists():
        with readme_path.open("r", encoding="utf8") as f:
          readme = f.read()
    else:
      readme = model_card

    with readme_path.open("w", encoding="utf-8") as f:
      f.write(readme)

    # Save our metrics to Readme metadata
    metadata_save(readme_path, metadata)

    # Step 6: Record a video
    video_path =  local_directory / "replay.mp4"
    record_video(env, model, video_path, video_fps)

    # Step 7. Push everything to the Hub
    api.upload_folder(
          repo_id=repo_id,
          folder_path=local_directory,
          path_in_repo=".",
    )

    print(f"Your model is pushed to the Hub. You can view your model here: {repo_url}")

透過使用 push_to_hub,**您將評估、錄製回放、生成代理的模型卡,並將其推送到 Hub**。

透過這種方式

為了能夠與社群分享你的模型,還需要完成三個步驟

1️⃣ (如果尚未完成)建立 HF 帳戶 ➡ https://huggingface.co/join

2️⃣ 登入後,你需要從 Hugging Face 網站儲存你的身份驗證令牌。

Create HF Token
notebook_login()

如果您不想使用 Google Colab 或 Jupyter Notebook,則需要使用此命令代替:huggingface-cli login(或 login

3️⃣ 我們現在準備使用 package_to_hub() 函式將我們訓練好的代理推送到 🤗 Hub 🔥

repo_id = ""  # TODO Define your repo id {username/Reinforce-{model-id}}
push_to_hub(
    repo_id,
    cartpole_policy,  # The model we want to save
    cartpole_hyperparameters,  # Hyperparameters
    eval_env,  # Evaluation environment
    video_fps=30
)

既然我們已經測試了我們實現的魯棒性,那麼讓我們嘗試一個更復雜的環境:PixelCopter 🚁

第二個代理:PixelCopter 🚁

研究 PixelCopter 環境 👀

env_id = "Pixelcopter-PLE-v0"
env = gym.make(env_id)
eval_env = gym.make(env_id)
s_size = env.observation_space.shape[0]
a_size = env.action_space.n
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample())  # Get a random observation
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample())  # Take a random action

觀察空間 (7) 👀

  • 玩家 y 座標
  • 玩家速度
  • 玩家與地板的距離
  • 玩家與天花板的距離
  • 下一個方塊與玩家的 x 距離
  • 下一個方塊的頂部 y 座標
  • 下一個方塊的底部 y 座標

動作空間 (2) 🎮

  • 向上(按下加速器)
  • 不作任何操作(不按加速器)

獎勵函式 💰

  • 它每透過一個垂直方塊,就獲得 +1 的正獎勵。每次達到終止狀態,它就獲得 -1 的負獎勵。

定義新策略 🧠

  • 由於環境更復雜,我們需要一個更深的神經網路。
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        # Define the three layers here

    def forward(self, x):
        # Define the forward process here
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

解決方案

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, h_size * 2)
        self.fc3 = nn.Linear(h_size * 2, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

定義超引數 ⚙️

  • 因為這個環境更復雜。
  • 特別是對於隱藏層的大小,我們需要更多的神經元。
pixelcopter_hyperparameters = {
    "h_size": 64,
    "n_training_episodes": 50000,
    "n_evaluation_episodes": 10,
    "max_t": 10000,
    "gamma": 0.99,
    "lr": 1e-4,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}

訓練它

  • 我們現在準備訓練我們的代理 🔥。
# Create policy and place it to the device
# torch.manual_seed(50)
pixelcopter_policy = Policy(
    pixelcopter_hyperparameters["state_space"],
    pixelcopter_hyperparameters["action_space"],
    pixelcopter_hyperparameters["h_size"],
).to(device)
pixelcopter_optimizer = optim.Adam(pixelcopter_policy.parameters(), lr=pixelcopter_hyperparameters["lr"])
scores = reinforce(
    pixelcopter_policy,
    pixelcopter_optimizer,
    pixelcopter_hyperparameters["n_training_episodes"],
    pixelcopter_hyperparameters["max_t"],
    pixelcopter_hyperparameters["gamma"],
    1000,
)

在 Hub 上釋出我們訓練好的模型 🔥

repo_id = ""  # TODO Define your repo id {username/Reinforce-{model-id}}
push_to_hub(
    repo_id,
    pixelcopter_policy,  # The model we want to save
    pixelcopter_hyperparameters,  # Hyperparameters
    eval_env,  # Evaluation environment
    video_fps=30
)

一些額外的挑戰 🏆

學習**最好的方法就是自己嘗試**!正如你所看到的,目前的代理表現不佳。首先,你可以嘗試進行更多步的訓練。但也要嘗試找到更好的引數。

排行榜中你會找到你的智慧體。你能名列前茅嗎?

以下是一些登上排行榜的方法:

  • 訓練更多步
  • 嘗試不同的超引數,參考你同學的做法 👉 https://huggingface.co/models?other=reinforce
  • **將您新訓練的模型推送到 Hub** 🔥
  • **改進複雜環境的實現**(例如,如何將網路更改為卷積神經網路以處理幀作為觀察?)。

**恭喜您完成本單元**!資訊量很大。恭喜您完成本教程。您剛剛使用 PyTorch 從頭開始編寫了您的第一個深度強化學習代理,並將其共享到 Hub 🥳。

不要猶豫在本單元進行迭代,**透過改進更復雜環境的實現**(例如,如何將網路更改為卷積神經網路以處理幀作為觀察?)。

在下一單元,**我們將學習更多關於 Unity MLAgents 的知識**,透過在 Unity 環境中訓練代理。這樣,您將準備好參與**AI vs AI 挑戰賽,您將在其中訓練您的代理在雪球大戰和足球比賽中與其他代理競爭。**

聽起來很有趣嗎?下次見!

最後,我們很想**聽聽你對課程的看法以及我們如何改進它**。如果你有任何反饋,請 👉 填寫此表

單元 5 見!🔥

持續學習,保持出色 🤗

< > 在 GitHub 上更新

© . This site is unofficial and not affiliated with Hugging Face, Inc.