我們知道,著名的AlphaGo的基本組成是由策略網(wǎng)絡(luò)(Policy network)估值網(wǎng)絡(luò)(Value network),蒙特卡洛搜索樹(Monte Carlo Tree Search)來共同完成,value network用于評估局面,policy network用于決策:

而Monte Carlo Tree Search作為一種解決多輪序貫博弈問題的策略,我們會在今后進行研究,今天首先要做的是對兩個網(wǎng)絡(luò)進行基本地使用:使用其來實現(xiàn)CartPole,CartPole是一個簡單的游戲,游戲策略即為如圖所示的模型,為模型施與一個向右或者向左的力,如果小車偏離中心超過2.4個單位距離,或者桿的傾斜度超過15度則視為游戲結(jié)束。

這里我們借助OpenAI Gym來實現(xiàn)。
下面進入正題:
Policy network
策略網(wǎng)絡(luò)即一個神經(jīng)網(wǎng)絡(luò)模型,可以通過觀察當(dāng)前的環(huán)境狀態(tài),來直接預(yù)測出一個最佳的行動策略,使這個策略可以獲得最大的期望收益。得到每個行動方案所對應(yīng)的概率。
所以解決CartPole問題,我們就有了方案:根據(jù)輸入的環(huán)境參數(shù)state,來得到對應(yīng)的每個action的概率。
在這里使用一個隱藏層來實現(xiàn):
H = 50
observate = tf.placeholder(tf.float32, [None, 4], name="input_x")
W1 = tf.get_variable("w1", shape=[4, H],
initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observate, W1))
W2 = tf.get_variable("w2", shape=[H, 1],
initializer=tf.contrib.layers.xavier_initializer())
score = tf.matmul(layer1, W2)
probability = tf.nn.sigmoid(score)
其中H為隱藏層的層數(shù),環(huán)境信息值observate并不是像素值,而是記錄小車速度,位置,桿的角度,速度等信息的有四個值的數(shù)組。
我們設(shè)力向左為0,向右為1,probability為Action為1的概率。
而訓(xùn)練的方向,則是基于一個環(huán)境,獲取價值越高的action所對應(yīng)的probability應(yīng)該越大。
我們設(shè)置每做出一個action之后,如果游戲沒有結(jié)束,則reward為1,否則為0,那么每個環(huán)境都有一個對應(yīng)reward為1的action。
我們當(dāng)前的學(xué)習(xí)目標(biāo)期望的價值,則為當(dāng)前的Reward加上未來潛在的可獲取的reward。

設(shè)置gamma為0~1的數(shù),防止目標(biāo)發(fā)散:
def discount_reward(r):
# 根據(jù)每個reward:r和gamma來求每次的潛在價值
discount_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(r.size)):
running_add = running_add * gamma + r[t]
discount_r[t] = running_add
return discount_r
由此得到每個action的潛在價值。
我們所要得到的訓(xùn)練結(jié)果為價值越大概率越大,價值越小概率越小。那么我們將loss設(shè)置為

當(dāng)前的action對應(yīng)的probability與其相應(yīng)的價值取反。可以擴大當(dāng)前的probability,而由于當(dāng)前的action為使游戲順利進行的action,故可以得到目標(biāo)結(jié)果。
對以上理論進行整合和實現(xiàn),得到Policy network實現(xiàn)的代碼:
import numpy as np
import tensorflow as tf
import gym
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
env = gym.make('CartPole-v0')
env.reset()
H = 50
batch_size = 25
learning_rate = 1e-1
D = 4
gamma = 0.99
xs, ys, drs = [], [], []
reward_sum = 0
episode_number = 1
total_episodes = 1000
# 根據(jù)當(dāng)前的環(huán)境狀態(tài)根據(jù)隱藏節(jié)點求action為1的概率
observate = tf.placeholder(tf.float32, [None, D], name="input_x")
W1 = tf.get_variable("w1", shape=[D, H],
initializer=tf.contrib.layers.xavier_initializer())
layer1 = tf.nn.relu(tf.matmul(observate, W1))
W2 = tf.get_variable("w2", shape=[H, 1],
initializer=tf.contrib.layers.xavier_initializer())
score = tf.matmul(layer1, W2)
probability = tf.nn.sigmoid(score)
# 根據(jù)概率來求損失和梯度
input_y = tf.placeholder(tf.float32, [None, 1], name="input_y")
advantages = tf.placeholder(tf.float32, name="reward_signal")
loglik = tf.log(input_y * (input_y - probability) +
(1 - input_y) * (input_y + probability))
loss = -tf.reduce_mean(loglik * advantages)
tvars = tf.trainable_variables()
newGrads = tf.gradients(loss, tvars)
# 根據(jù)梯度優(yōu)化訓(xùn)練兩層神經(jīng)網(wǎng)絡(luò)
adam = tf.train.AdamOptimizer(learning_rate=learning_rate)
W1grad = tf.placeholder(tf.float32, name="batch_grad1")
W2grad = tf.placeholder(tf.float32, name="batch_grad2")
batchGrad = [W1grad, W2grad]
updateGrads = adam.apply_gradients(zip(batchGrad, tvars))
def discount_reward(r):
# 根據(jù)每個reward:r和gamma來求每次的潛在價值
discount_r = np.zeros_like(r)
running_add = 0
for t in reversed(range(r.size)):
running_add = running_add * gamma + r[t]
discount_r[t] = running_add
return discount_r
# Session執(zhí)行
with tf.Session() as sess:
rendering = False
init = tf.global_variables_initializer()
sess.run(init)
observation = env.reset()
gradBuff = sess.run(tvars)
for ix, grad in enumerate(gradBuff):
gradBuff[ix] = grad * 0
while episode_number <= total_episodes:
if reward_sum / batch_size > 100 or rendering == True:
rendering = True
env.render()
x = np.reshape(observation, [1, D])
tfprob = sess.run(probability, feed_dict={observate: x})
action = 1 if np.random.uniform() < tfprob else 0
xs.append(x)
y = 1 - action
ys.append(y)
observation, reward, done, info = env.step(action)
reward_sum += reward
drs.append(reward)
if done:
episode_number += 1
epx = np.vstack(xs)
epy = np.vstack(ys)
epr = np.vstack(drs)
xs, ys, drs = [], [], []
discount_epr = discount_reward(epr)
discount_epr -= np.mean(discount_epr)
discount_epr /= np.std(discount_epr)
tGrad = sess.run(newGrads, feed_dict={observate:epx,
input_y:epy,
advantages: discount_epr})
for ix, grad in enumerate(tGrad):
gradBuff[ix] += grad
if episode_number % batch_size == 0:
sess.run(updateGrads, feed_dict={W1grad:gradBuff[0],
W2grad:gradBuff[1]})
for ix, grad in enumerate(gradBuff):
gradBuff[ix] = grad * 0
print('Average reward for episode %d: %f.' % \
(episode_number, reward_sum/batch_size))
if reward_sum/batch_size > 200:
print('Task solve in', episode_number, 'episodes!')
break
reward_sum = 0
observation = env.reset()
觀察結(jié)果:

只進行了一百多次實驗,平均reward便已經(jīng)可以達到100了。
Value network
與策略網(wǎng)絡(luò)不同的是,估值網(wǎng)絡(luò)則是學(xué)習(xí)action對應(yīng)的期望價值,成為Q-learning,期望價值指的是從當(dāng)前的這一步到后續(xù)的所有步驟總共可以獲得的期望的最大值,用Q表示。
關(guān)于Q-learning的簡單實用: http://m.itdecent.cn/p/1c0d5e83b066
可以知道 ,Q矩陣記錄的內(nèi)容為在某一個state下所有的action對應(yīng)的Q值,但是 在稍微復(fù)雜的環(huán)境中,如CartPole游戲,state是有非常多的,我們不可能把所有的state都用一個Q矩陣來記錄,所以我們引入DQN,即較深層的神經(jīng)網(wǎng)絡(luò),DQN與普通的Q-learning不同的在于相較于簡單的矩陣記錄方法,我們使用神經(jīng)網(wǎng)絡(luò)來對輸入的state和每個action的Q值來進行訓(xùn)練

如上圖,輸入state,經(jīng)過神經(jīng)網(wǎng)絡(luò)的處理以后 得到每個action的value。根據(jù)最大的value來選擇action。
我們使用兩個神經(jīng)網(wǎng)絡(luò)來對state進行處理得到每個value的價值。
W1 = tf.Variable(tf.truncated_normal([STATE, HIDDEN_SIZE]))
b1 = tf.Variable(tf.constant(0.01, shape = [HIDDEN_SIZE]))
W2 = tf.Variable(tf.truncated_normal([HIDDEN_SIZE, ACTION]))
b2 = tf.Variable(tf.constant(0.01, shape=[ACTION]))
state_input = tf.placeholder("float",[None,STATE])
h_layer = tf.nn.relu(tf.matmul(state_input,W1) + b1)
Q_value = tf.matmul(h_layer,W2) + b2
然后使用一個buffer緩存來儲存若干個數(shù)據(jù),每次從中隨機取出batch_size個數(shù)據(jù)來進行訓(xùn)練,為了使數(shù)據(jù)可變,當(dāng)數(shù)據(jù)數(shù)量超出規(guī)定個數(shù)以后使用新數(shù)據(jù)替換掉較老的數(shù)據(jù)
buffer = deque()
def add(state,action,reward,next_state,done):
if len(buffer) > 100:
buffer.popleft()
buffer.append((state,action,reward,next_state,done))
規(guī)定loss值為step得出的reward值與求解出來的R值來做差平方后求均值,使得兩者的值更加接近即可。
action_input = tf.placeholder("float",[None, ACTION])
y_input = tf.placeholder("float",[None])
Q_action = tf.reduce_sum(tf.multiply(Q_value, action_input),reduction_indices=1)
cost = tf.reduce_mean(tf.square(y_input - Q_action))
optimizer = tf.train.AdamOptimizer(0.001).minimize(cost)
接下來我們使用類將它們封裝后一起代入,可以得到經(jīng)過訓(xùn)練后的結(jié)果
import gym
import tensorflow as tf
import numpy as np
import random
from collections import deque
GAMMA = 0.9
INITIAL_EPSILON = 0.5
FINAL_EPSILON = 0.01
REPLAY_SIZE = 10000
BATCH_SIZE = 32
HIDDEN_SIZE = 20
class DQN():
def __init__(self, env):
self.replay_buffer = deque()
self.time_step = 0
self.epsilon = INITIAL_EPSILON
self.state_dim = env.observation_space.shape[0]
self.action_dim = env.action_space.n
self.create_Q_network()
self.create_training_method()
self.session = tf.InteractiveSession()
self.session.run(tf.initialize_all_variables())
def create_Q_network(self):
W1 = self.weight_variable([self.state_dim, HIDDEN_SIZE])
b1 = self.bias_variable([HIDDEN_SIZE])
W2 = self.weight_variable([HIDDEN_SIZE,self.action_dim])
b2 = self.bias_variable([self.action_dim])
self.state_input = tf.placeholder("float",[None,self.state_dim])
h_layer = tf.nn.relu(tf.matmul(self.state_input,W1) + b1)
self.Q_value = tf.matmul(h_layer,W2) + b2
def create_training_method(self):
self.action_input = tf.placeholder("float",[None,self.action_dim]) # one hot presentation
self.y_input = tf.placeholder("float",[None])
Q_action = tf.reduce_sum(tf.multiply(self.Q_value,self.action_input),reduction_indices = 1)
self.cost = tf.reduce_mean(tf.square(self.y_input - Q_action))
self.optimizer = tf.train.AdamOptimizer(0.0001).minimize(self.cost)
def perceive(self,state,action,reward,next_state,done):
one_hot_action = np.zeros(self.action_dim)
one_hot_action[action] = 1
self.replay_buffer.append((state,one_hot_action,reward,next_state,done))
if len(self.replay_buffer) > REPLAY_SIZE:
self.replay_buffer.popleft()
if len(self.replay_buffer) > BATCH_SIZE:
self.train_Q_network()
def train_Q_network(self):
self.time_step += 1
minibatch = random.sample(self.replay_buffer,BATCH_SIZE)
state_batch = [data[0] for data in minibatch]
action_batch = [data[1] for data in minibatch]
reward_batch = [data[2] for data in minibatch]
next_state_batch = [data[3] for data in minibatch]
print(reward_batch)
y_batch = []
Q_value_batch = self.Q_value.eval(feed_dict={self.state_input:next_state_batch})
for i in range(0,BATCH_SIZE):
done = minibatch[i][4]
if done:
y_batch.append(reward_batch[i])
else :
y_batch.append(reward_batch[i] + GAMMA * np.max(Q_value_batch[i]))
#print(state_batch)
#print(action_batch)
#print(y_batch)
self.optimizer.run(feed_dict={
self.y_input:y_batch,
self.action_input:action_batch,
self.state_input:state_batch
})
def egreedy_action(self,state):
value = self.Q_value.eval(feed_dict = {
self.state_input:[state]
})
self.epsilon -= (INITIAL_EPSILON - FINAL_EPSILON)/10000
Q_value = value[0]
if random.random() <= self.epsilon:
return random.randint(0,self.action_dim - 1)
else:
return np.argmax(Q_value)
def action(self,state):
value = self.Q_value.eval(feed_dict = {
self.state_input:[state]
})
return np.argmax(value[0])
def weight_variable(self,shape):
initial = tf.truncated_normal(shape)
return tf.Variable(initial)
def bias_variable(self,shape):
initial = tf.constant(0.01, shape = shape)
return tf.Variable(initial)
ENV_NAME = 'CartPole-v0'
EPISODE = 10000
STEP = 300
TEST = 10
def main():
env = gym.make(ENV_NAME)
agent = DQN(env)
for episode in range(EPISODE):
state = env.reset()
for step in range(STEP):
action = agent.egreedy_action(state)
next_state,reward,done,_ = env.step(action)
agent.perceive(state,action,reward,next_state,done)
state = next_state
if done:
break
if episode % 100 == 0:
total_reward = 0
for i in range(TEST):
state = env.reset()
for j in range(STEP):
if total_reward/TEST >= 160:
env.render()
action = agent.action(state)
state,reward,done,_ = env.step(action)
total_reward += reward
if done:
break
ave_reward = total_reward/TEST
print ('episode: ',episode,'Evaluation Average Reward:',ave_reward)
if __name__ == '__main__':
main()