DQN(Deep Q-Network)原理即代碼分析

前面說了 PPO 算法, 下面一鼓作氣, 把其他的相關的強化學習也一并學習下。這里還是給出我學習的一些視頻鏈接

視頻鏈接: 不愧是頂會收割機!迪哥精講強化學習4大主流算法:PPO、Q-learning、DQN、A3C 50集入門到精通!

一、 Q-Learning

1.1 Q-Learning思想

在說 DQN 算法之前先說下 Q-Learning 算法,該算法是當時谷歌發(fā)明的一種強化學習算法, 該算法是一種基于值函數(shù)的強化學習算法, 它通過學習一個狀態(tài)-動作值函數(shù)(Q函數(shù))來選擇最優(yōu)策略, Q-Learning是一種無模型(model-free)的強化學習方法, 即它不需要了解環(huán)境的狀態(tài)(轉移概率以及獎勵函數(shù)),僅依賴于環(huán)境的交互。Q-Learning 的目標是通過不斷更新 Q 值,使得 Agent 能夠選擇在給定狀態(tài)下能獲得(0≤\gamma≤1)得最大累計獎勵的動作,Q-Learning的一個重要特點是它保證在探索足夠多的狀態(tài)-動作對后, 最終會收斂到最優(yōu)策略。

  • Q 函數(shù)的定義: Q-Learning中, Q 函數(shù)Q(s, a)表示在狀態(tài)s下采取的動作a所能獲得的期望回報。Q 函數(shù)是Q-Learning 的核心, 通過對 對Q值的不斷更新,最終得到最優(yōu)的 Q 函數(shù)Q^{*}(s,a)

  • Q-Learning核心思想:

Q-Learning的核心思想就是通過貝爾曼方差來更新 Q 值, 貝爾曼方程描述了某一狀態(tài)-動作對的 Q 值與其后續(xù)狀態(tài)-動作之間的關系, 在 Q-Learning 中, 更新公式為:
Q(s_t, a_t) = Q(s_t, a_t) + \alpha \left( R_{t+1} + \gamma \max_{a'} Q(s_{t+1}, a') - Q(s_t, a_t) \right)

  • 其中s_ta_t分別是當前狀態(tài)和當前動作
  • R_{t+1}是 Agent 在執(zhí)行動作a_t后, 從環(huán)境中獲得的及時獎勵
  • \gamma是折扣因子,表示未來獎勵的衰減程度(0 \leq \gamma \leq 1)
  • \max_{a'} Q(s_{t+1}, a')是狀態(tài)s_{t+1}下所有可能動作的最大 Q值,代表智能體在下一個狀態(tài)下選擇最優(yōu)動作后的預期回報。
  • \alpha是學習率, 控制每次Q值更新的步長
    通過以上公式,Q-Learning 在每個時間步 t 都會根據(jù)當前的經驗(狀態(tài)、動作、獎勵、下一個狀態(tài))來更新 Q 值。隨著學習的進行, Q 值會逐漸收斂到一個最優(yōu) Q 值Q*(s,a), 從而得到最優(yōu)策略。
  • Q-Learning 算法推導
    Q-Learning 的更新公式來自于 貝爾曼最優(yōu)方程(Bellman Optimality Equation),它為求解最優(yōu)值函數(shù)提供了遞歸關系。假設Q^{*}(s,a)是最優(yōu)狀態(tài)-動作,即在每個狀態(tài)下,選擇最優(yōu)動作可以獲得最大回報。根據(jù)貝爾曼最優(yōu)方程,我們有:
    Q(s,a) = \mathbb{E}_{s'} \left[ R_{t+1} + \gamma \max_{a'} Q(s',a') \right]

這表示,某一狀態(tài)-動作對的 Q 值等于當前獎勵R_{t+1}加上未來狀態(tài)s'下, 采取最優(yōu)動作a'所得到的最大預期回報, 通過與實際更新公式的對比, Q-Learning 通過貝爾曼方程遞歸地更新Q值, 是的 Q 值逐漸逼近最優(yōu)值Q^{*}(s,a)

核心思想:獲取的獎勵 = 眼前的瞬時獎勵(做了一個動作就能獲取到的經驗) + 記憶獎勵(按照訓練時的經驗, 上一個動作發(fā)生后, 接下來怎么做才能獲得更大獎勵)

Q-Learning 整體流程圖
  • 首先需要獲取數(shù)據(jù), 也就是玩游戲的記錄:{(s_i, a_i, s'_i, r_i)}, 其中s_i代表當前狀態(tài),a_i代表當前動作, s'_i代表下一時刻狀態(tài), r_i代表當前執(zhí)行動作之后獲取到的瞬時獎勵。

  • 模型的目標等于: y_i \leftarrow r(s_i, a_i) + \gamma \max_{a_i'} Q_\phi(s_i', a_i'), 也就是說當前計算的 Q 值等于當前獲得的瞬時獎勵加上下一個狀態(tài)執(zhí)行不同動作獲得收益最大動作的 Q 值, 這里的\gamma值因為是下一步,影響貢獻度有打折的成分在里面, 這里的y_i可以理解為期望。

  • 模型的目標函數(shù):\arg \min_\phi \frac{1}{2} \sum_i \left\| Q_\phi(\mathbf{s}_i, \mathbf{a}_i) - \mathbf{y}_i \right\|^2

  • 基于查表(DQN使用網絡替代查表)進行迭代更新使得獎勵值更大


    這里我們選擇\epsilon-greedy Policy策略, 可以根據(jù)代碼了解

1.2 Q-Learning 簡單示例

如下圖是一個密室逃脫







下面給出 Q-Learning 的代碼, 代碼來源Q-learning_Nogo.py

import numpy as np  # 導入NumPy庫,用于數(shù)值計算
import pandas as pd  # 導入Pandas庫,用于數(shù)據(jù)結構操作
import matplotlib.pyplot as plt  # 導入Matplotlib庫,用于繪制圖形
import time  # 導入time庫,用于控制程序暫停時間

ALPHA = 0.1  # 學習率,決定每次Q值更新的幅度
GAMMA = 0.95  # 折扣因子,決定未來獎勵的權重
EPSILION = 0.9  # epsilon-greedy策略中的探索概率,控制隨機選擇動作的比例
N_STATE = 20  # 狀態(tài)空間的大小,表示狀態(tài)的數(shù)量
ACTIONS = ['left', 'right']  # 可用的動作集合,左移和右移
MAX_EPISODES = 200  # 最大訓練回合數(shù)
FRESH_TIME = 0.1  # 每步之間的時間間隔,用于渲染環(huán)境

# 構建Q表,Q表存儲每個狀態(tài)-動作對的Q值
def build_q_table(n_state, actions):
    q_table = pd.DataFrame(
        np.zeros((n_state, len(actions))),  # 創(chuàng)建一個形狀為(n_state, len(actions))的全零矩陣
        np.arange(n_state),  # 狀態(tài)的行索引為0到n_state-1
        actions  # 動作的列索引為'left'和'right'
    )
    return q_table  # 返回Q表

# 選擇當前狀態(tài)下的動作
def choose_action(state, q_table):
    # epsilon-greedy 策略
    state_action = q_table.loc[state, :]  # 獲取當前狀態(tài)下所有動作的Q值
    if np.random.uniform() > EPSILION or (state_action == 0).all():  # 探索(隨機選擇)或當Q值全為0時
        action_name = np.random.choice(ACTIONS)  # 隨機選擇一個動作
    else:  # 利用(選擇Q值最大的動作)
        action_name = state_action.idxmax()  # 選擇Q值最大對應的動作
    return action_name  # 返回選擇的動作

# 獲取環(huán)境反饋,依據(jù)當前狀態(tài)和所選動作返回下一個狀態(tài)和獎勵
def get_env_feedback(state, action):
    if action == 'right':  # 如果選擇了向右的動作
        if state == N_STATE - 2:  # 如果已經到達倒數(shù)第二個狀態(tài)
            next_state = 'terminal'  # 終止狀態(tài)
            reward = 1  # 到達終止狀態(tài)時給予獎勵1
        else:
            next_state = state + 1  # 否則,狀態(tài)右移
            reward = -0.5  # 每步獎勵為-0.5
    else:  # 如果選擇了向左的動作
        if state == 0:  # 如果已經到達最左端
            next_state = 0  # 保持在狀態(tài)0
        else:
            next_state = state - 1  # 否則,狀態(tài)左移
        reward = -0.5  # 每步獎勵為-0.5
    return next_state, reward  # 返回下一個狀態(tài)和獎勵

# 更新環(huán)境的狀態(tài)并打印出來
def update_env(state, episode, step_counter):
    env = ['-'] * (N_STATE - 1) + ['T']  # 創(chuàng)建一個狀態(tài)列表,其中T表示目標終止狀態(tài)
    if state == 'terminal':  # 如果狀態(tài)是終止狀態(tài)
        print("Episode {}, the total step is {}".format(episode + 1, step_counter))  # 輸出當前回合和步數(shù)
        final_env = ['-'] * (N_STATE - 1) + ['T']  # 終止狀態(tài)時的環(huán)境狀態(tài)
        return True, step_counter  # 返回終止狀態(tài)和步數(shù)
    else:
        env[state] = '*'  # 將當前狀態(tài)位置標記為*表示智能體所在的位置
        env = ''.join(env)  # 將狀態(tài)列表轉為字符串顯示
        print(env)  # 打印當前環(huán)境狀態(tài)
        time.sleep(FRESH_TIME)  # 暫停FRESH_TIME秒以控制顯示速度
        return False, step_counter  # 返回非終止狀態(tài)和步數(shù)

# Q-learning算法實現(xiàn)
def q_learning():
    q_table = build_q_table(N_STATE, ACTIONS)  # 構建Q表
    step_counter_times = []  # 用于存儲每個回合的步數(shù)
    for episode in range(MAX_EPISODES):  # 遍歷每個回合
        state = 0  # 每個回合從狀態(tài)0開始
        is_terminal = False  # 是否到達終止狀態(tài)的標志
        step_counter = 0  # 步數(shù)計數(shù)器
        update_env(state, episode, step_counter)  # 更新環(huán)境并顯示
        while not is_terminal:  # 如果沒有到達終止狀態(tài)
            action = choose_action(state, q_table)  # 選擇動作
            next_state, reward = get_env_feedback(state, action)  # 獲取環(huán)境反饋
            next_q = q_table.loc[state, action]  # 獲取當前狀態(tài)-動作對的Q值
            if next_state == 'terminal':  # 如果到達終止狀態(tài)
                is_terminal = True  # 標記為終止狀態(tài)
                q_target = reward  # 目標Q值為獎勵值
            else:  # 如果沒有到達終止狀態(tài)
                delta = reward + GAMMA * q_table.iloc[next_state, :].max() - q_table.loc[state, action]  # 計算TD誤差
                q_table.loc[state, action] += ALPHA * delta  # 更新Q值
            state = next_state  # 更新狀態(tài)
            is_terminal, steps = update_env(state, episode, step_counter + 1)  # 更新環(huán)境并顯示
            step_counter += 1  # 步數(shù)+1
            if is_terminal:  # 如果到達終止狀態(tài)
                step_counter_times.append(steps)  # 記錄回合的步數(shù)
    return q_table, step_counter_times  # 返回更新后的Q表和每個回合的步數(shù)列表


# 程序入口
if __name__ == '__main__':
    q_table, step_counter_times = q_learning()  # 運行Q-learning算法
    print("Q table\n{}\n".format(q_table))  # 打印最終的Q表
    print('end')  # 打印訓練結束信息

    # 繪制每回合步數(shù)的圖表
    plt.plot(step_counter_times, 'g-')  # 以綠色線條繪制步數(shù)
    plt.ylabel("steps")  # 設置Y軸標簽為"steps"
    plt.title("Q-Learning Algorithm")  # 設置圖標題
    plt.show()  # 顯示圖表
    print("The step_counter_times is {}".format(step_counter_times))  # 打印每個回合的步數(shù)

二、DQN 算法

但是在實際問題中, 不太能用Q矩陣進行表示, 因此需要用神經網絡進行替代, 更新表的過程可以用更新網絡的參數(shù)替代。數(shù)據(jù)的獲取可以使用 replay buffers 方式構建,用的時候取一個 batch 就可以了, 這種方法其實也就是off policy的策略。下面有幾個核心技術

  • Experience Replay(經驗回放)
    ○ 不像 q-table 的 q-learning, 每一步都學習(update)該步的內容(experience), 對于 q-table 而言, 每一步都學習該步的內容,神經網絡連續(xù)地學習 時間上相關性高的內容(事實上, 時間 t 的學習內容和時間 t+1 的學習內容非常相似,這樣的話收斂會很慢)
    ○ 將每一步(step)的內容存儲在經驗池(experience pool)并隨機從經驗池中提取內容(replay, 回放) 讓 NN 學習, 這也是一種批次化(batch), 使用經驗池中的多個步驟的經驗
  • Loss Function 使用 huber 而不是 square loss,誤差很大時((|\delta| > 1)),平方誤差會導致誤差函數(shù)的輸出過大,導致學習難以穩(wěn)定
    \mathcal{L}(\delta) = \begin{cases} \frac{1}{2}\delta^2, & |\delta| \leq 1, \quad \text{MSE} \\ |\delta| - \frac{1}{2}, & |\delta| > 1 \quad \text{MAE} \end{cases}
為什么需要經驗回放

為什么需要第二個神經網絡

重點

  • 設置獎勵:評估每一步的結果, 為 agent 建立目標
  • 利用經驗回放進行學習(experience replay: 將經驗按照(s, a, s_, r)的形式存儲在記憶庫中, 每次隨機抽取一個 batch 大小的 transition 數(shù)據(jù)訓練網絡
  • 更新目標網絡(Target Network: 目標網絡原有的網絡同構, 學習一定次數(shù)后, 再將評估網絡的權重賦給目標網絡, 目標網絡的引入增加了學習的穩(wěn)定性

下面給出 DQN 相關代碼, 很簡單看下就知道了DQN_Nogo.py。

import torch  # 導入PyTorch庫,提供深度學習功能
import torch.nn as nn  # 導入PyTorch中的神經網絡模塊
import torch.nn.functional as F  # 導入PyTorch中常用的函數(shù)式API,如激活函數(shù)
import numpy as np  # 導入NumPy庫,用于數(shù)值計算
import gym  # 導入Gym庫,提供強化學習環(huán)境
import matplotlib.pyplot as plt  # 導入matplotlib庫,用于繪圖
import copy  # 導入copy庫,用于復制對象

# hyper-parameters
BATCH_SIZE = 128  # 每次更新時,從記憶池中采樣的樣本數(shù)量
LR = 0.01  # 學習率,用于優(yōu)化器
GAMMA = 0.90  # 折扣因子,用于計算未來獎勵
EPISILO = 0.9  # epsilon-greedy策略中的epsilon值,控制探索和利用的平衡
MEMORY_CAPACITY = 2000  # 記憶池的最大容量
Q_NETWORK_ITERATION = 100  # 每100次學習步驟更新一次目標網絡

env = gym.make("CartPole-v1")  # 創(chuàng)建Gym環(huán)境,使用CartPole-v1環(huán)境
env = env.unwrapped  # 獲取原始環(huán)境,解除環(huán)境的封裝
NUM_ACTIONS = env.action_space.n  # 獲取動作空間的維度,即可選的動作數(shù)
NUM_STATES = env.observation_space.shape[0]  # 獲取狀態(tài)空間的維度,即每個狀態(tài)的特征數(shù)
ENV_A_SHAPE = 0 if isinstance(env.action_space.sample(), int) else env.action_space.sample.shape  # 確定動作的形狀(如果是離散動作,返回0)

class Net(nn.Module):  # 定義一個繼承自nn.Module的神經網絡類
    """神經網絡結構,用于近似Q值函數(shù)"""
    def __init__(self):
        super(Net, self).__init__()  # 初始化父類的構造函數(shù)
        self.fc1 = nn.Linear(NUM_STATES, 50)  # 第一層,全連接層,輸入維度為NUM_STATES,輸出維度為50
        self.fc1.weight.data.normal_(0, 0.1)  # 對權重進行正態(tài)分布初始化,均值為0,標準差為0.1
        self.fc2 = nn.Linear(50, 30)  # 第二層,全連接層,輸入維度為50,輸出維度為30
        self.fc2.weight.data.normal_(0, 0.1)  # 對權重進行正態(tài)分布初始化
        self.out = nn.Linear(30, NUM_ACTIONS)  # 輸出層,全連接層,輸入維度為30,輸出維度為NUM_ACTIONS
        self.out.weight.data.normal_(0, 0.1)  # 對輸出層的權重進行正態(tài)分布初始化

    def forward(self, x):  # 定義前向傳播函數(shù)
        x = self.fc1(x)  # 通過第一層
        x = F.relu(x)  # 使用ReLU激活函數(shù)
        x = self.fc2(x)  # 通過第二層
        x = F.relu(x)  # 使用ReLU激活函數(shù)
        action_prob = self.out(x)  # 輸出動作的Q值
        return action_prob  # 返回動作的Q值

class DQN():  # 定義DQN類
    """深度Q網絡(DQN)類,包含Q學習的實現(xiàn)"""
    def __init__(self):
        super(DQN, self).__init__()  # 初始化父類的構造函數(shù)
        self.eval_net, self.target_net = Net(), Net()  # 創(chuàng)建評估網絡(eval_net)和目標網絡(target_net)

        self.learn_step_counter = 0  # 學習步驟計數(shù)器
        self.memory_counter = 0  # 記憶池中的樣本計數(shù)器
        self.memory = np.zeros((MEMORY_CAPACITY, NUM_STATES * 2 + 2))  # 初始化一個空的記憶池
        # 記憶池保存的內容:[狀態(tài), 動作, 獎勵, 下一個狀態(tài)]
        self.optimizer = torch.optim.Adam(self.eval_net.parameters(), lr=LR)  # 使用Adam優(yōu)化器
        self.loss_func = nn.MSELoss()  # 定義損失函數(shù)為均方誤差

    def choose_action(self, state):
        state = torch.unsqueeze(torch.FloatTensor(state), 0)  # 將狀態(tài)轉換為浮動張量,并添加一個維度,使其成為batch的形式
        if np.random.randn() <= EPISILO:  # epsilon-greedy策略,隨機探索
            action_value = self.eval_net.forward(state)  # 獲取評估網絡的輸出Q值
            action = torch.max(action_value, 1)[1].data.numpy()  # 選擇Q值最大的動作
            action = action[0] if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)  # 如果動作空間是離散的,直接取最大值;否則根據(jù)動作空間的形狀調整
        else:  # 探索階段,隨機選擇動作
            action = np.random.randint(0, NUM_ACTIONS)  # 從動作空間中隨機選擇一個動作
            action = action if ENV_A_SHAPE == 0 else action.reshape(ENV_A_SHAPE)  # 根據(jù)動作空間的形狀調整動作
        return action  # 返回選擇的動作

    def store_transition(self, state, action, reward, next_state):
        transition = np.hstack((state, [action, reward], next_state))  # 將狀態(tài)、動作、獎勵和下一個狀態(tài)拼接為一個過渡
        index = self.memory_counter % MEMORY_CAPACITY  # 計算當前樣本在記憶池中的存儲位置
        self.memory[index, :] = transition  # 將過渡存入記憶池
        self.memory_counter += 1  # 增加記憶池計數(shù)器

    def learn(self):
        # 每Q_NETWORK_ITERATION步更新一次目標網絡
        if self.learn_step_counter % Q_NETWORK_ITERATION == 0:
            self.target_net.load_state_dict(self.eval_net.state_dict())  # 目標網絡的參數(shù)與評估網絡同步
        self.learn_step_counter += 1  # 增加學習步驟計數(shù)器

        # 從記憶池中隨機抽取一個批次
        sample_index = np.random.choice(MEMORY_CAPACITY, BATCH_SIZE)  # 從記憶池中隨機選擇一個批次
        batch_memory = self.memory[sample_index, :]  # 獲取批次中的過渡
        batch_state = torch.FloatTensor(batch_memory[:, :NUM_STATES])  # 提取狀態(tài)部分
        batch_action = torch.LongTensor(batch_memory[:, NUM_STATES:NUM_STATES+1].astype(int))  # 提取動作部分
        batch_reward = torch.FloatTensor(batch_memory[:, NUM_STATES+1:NUM_STATES+2])  # 提取獎勵部分
        batch_next_state = torch.FloatTensor(batch_memory[:,-NUM_STATES:])  # 提取下一個狀態(tài)部分

        # 計算當前Q值
        q_eval = self.eval_net(batch_state).gather(1, batch_action)  # 使用評估網絡計算當前狀態(tài)下,選擇的動作的Q值
        q_next = self.target_net(batch_next_state).detach()  # 計算目標網絡輸出的Q值(detach避免計算梯度)
        q_target = batch_reward + GAMMA * q_next.max(1)[0].view(BATCH_SIZE, 1)  # 計算目標Q值
        loss = self.loss_func(q_eval, q_target)  # 計算損失函數(shù)(均方誤差)

        self.optimizer.zero_grad()  # 清零梯度
        loss.backward()  # 反向傳播計算梯度
        self.optimizer.step()  # 更新評估網絡的參數(shù)

def reward_func(env, x, x_dot, theta, theta_dot):
    # 計算獎勵函數(shù),考慮了小車與桿的偏差
    r1 = (env.x_threshold - abs(x))/env.x_threshold - 0.5  # 小車位置偏差的獎勵
    r2 = (env.theta_threshold_radians - abs(theta)) / env.theta_threshold_radians - 0.5  # 桿的角度偏差的獎勵
    reward = r1 + r2  # 總獎勵
    return reward  # 返回獎勵


if __name__ == '__main__':
    dqn = DQN()  # 創(chuàng)建DQN對象
    episodes = 400  # 訓練的總回合數(shù)
    print("Collecting Experience....")  # 輸出提示信息
    reward_list = []  # 用于存儲每一回合的獎勵
    plt.ion()  # 開啟交互式繪圖模式
    fig, ax = plt.subplots()  # 創(chuàng)建繪圖窗口

    for i in range(episodes):  # 遍歷每一回合
        state, _ = env.reset()  # 重置環(huán)境,獲取初始狀態(tài)
        ep_reward = 0  # 初始化該回合的獎勵
        while True:  # 回合進行
            env.render()  # 渲染環(huán)境,可視化當前狀態(tài)
            action = dqn.choose_action(state)  # 根據(jù)當前狀態(tài)選擇動作
            next_state, _, done, info, _ = env.step(action)  # 執(zhí)行動作,獲取下一個狀態(tài)、獎勵、是否結束標志等信息
            x, x_dot, theta, theta_dot = next_state  # 提取環(huán)境返回的狀態(tài)變量
            reward = reward_func(env, x, x_dot, theta, theta_dot)  # 根據(jù)狀態(tài)變量計算獎勵

            dqn.store_transition(state, action, reward, next_state)  # 將當前的過渡存儲到記憶池中
            ep_reward += reward  # 累加當前回合的獎勵

            if dqn.memory_counter >= MEMORY_CAPACITY:  # 如果記憶池已滿
                dqn.learn()  # 開始學習
                if done:  # 如果當前回合結束
                    print("episode: {} , the episode reward is {}".format(i, round(ep_reward, 3)))  # 輸出當前回合的獎勵

            if done:  # 如果回合結束
                break  # 跳出循環(huán)

            state = next_state  # 更新狀態(tài),進入下一個時間步

        r = copy.copy(reward)  # 復制當前回合的獎勵
        reward_list.append(r)  # 將獎勵加入獎勵列表
        ax.set_xlim(0, 300)  # 設置繪圖的x軸范圍
        # ax.cla()  # 可選:清空當前繪圖(注釋掉以保持之前的圖形)
        ax.plot(reward_list, 'g-', label='total_loss')  # 繪制獎勵曲線
        plt.pause(0.001)  # 暫停一小段時間,用于更新圖形

參考

[1] 【強化學習】基礎離線算法:Q-Learning算法
[2] 組會講解強化學習的DQN算法

最后編輯于
?著作權歸作者所有,轉載或內容合作請聯(lián)系作者
【社區(qū)內容提示】社區(qū)部分內容疑似由AI輔助生成,瀏覽時請結合常識與多方信息審慎甄別。
平臺聲明:文章內容(如有圖片或視頻亦包括在內)由作者上傳并發(fā)布,文章內容僅代表作者本人觀點,簡書系信息發(fā)布平臺,僅提供信息存儲服務。

相關閱讀更多精彩內容

友情鏈接更多精彩內容