在最后几章中,我们通过使用神经网络近似 q 函数来了解 Deep Q 学习的工作原理。 在此之后,我们看到了深度 Q 网络(DQN)的各种改进,例如双重 Q 学习,决斗网络架构和深度循环 Q 网络。 我们已经了解了 DQN 如何利用回放缓冲区来存储智能体的经验,并使用缓冲区中的小批样本来训练网络。 我们还实现了用于玩 Atari 游戏的 DQN 和一个用于玩 Doom 游戏的深度循环 Q 网络(DRQN)。 在本章中,让我们进入决斗 DQN 的详细实现,它与常规 DQN 基本相同,除了最终的全连接层将分解为两个流,即值流和优势流,而这两个流将合并在一起以计算 Q 函数。 我们将看到如何训练决斗的 DQN 来赢得赛车比赛的智能体。
在本章中,您将学习如何实现以下内容:
- 环境包装器函数
- 决斗网络
- 回放缓冲区
- 训练网络
- 赛车
本章使用的代码归功于 Giacomo Spigler 的 GitHub 存储库。 在本章中,每一行都对代码进行了说明。 有关完整的结构化代码,请查看上面的 GitHub 存储库。
首先,我们导入所有必需的库:
import numpy as np
import tensorflow as tf
import gym
from gym.spaces import Box
from scipy.misc import imresize
import random
import cv2
import time
import logging
import os
import sys
我们定义EnvWrapper
类并定义一些环境包装器函数:
class EnvWrapper:
我们定义__init__
方法并初始化变量:
def __init__(self, env_name, debug=False):
初始化gym
环境:
self.env = gym.make(env_name)
获取action_space
:
self.action_space = self.env.action_space
获取observation_space
:
self.observation_space = Box(low=0, high=255, shape=(84, 84, 4))
初始化frame_num
以存储帧数:
self.frame_num = 0
初始化monitor
以记录游戏画面:
self.monitor = self.env.monitor
初始化frames
:
self.frames = np.zeros((84, 84, 4), dtype=np.uint8)
初始化一个名为debug
的布尔值,将其设置为true
时将显示最后几帧:
self.debug = debug
if self.debug:
cv2.startWindowThread()
cv2.namedWindow("Game")
接下来,我们定义一个名为step
的函数,该函数将当前状态作为输入并返回经过预处理的下一状态的帧:
def step(self, a):
ob, reward, done, xx = self.env.step(a)
return self.process_frame(ob), reward, done, xx
我们定义了一个称为reset
的函数来重置环境; 重置后,它将返回预处理的游戏屏幕:
def reset(self):
self.frame_num = 0
return self.process_frame(self.env.reset())
接下来,我们定义另一个用于渲染环境的函数:
def render(self):
return self.env.render()
现在,我们定义用于预处理帧的process_frame
函数:
def process_frame(self, frame):
# convert the image to gray
state_gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
# change the size
state_resized = cv2.resize(state_gray,(84,110))
#resize
gray_final = state_resized[16:100,:]
if self.frame_num == 0:
self.frames[:, :, 0] = gray_final
self.frames[:, :, 1] = gray_final
self.frames[:, :, 2] = gray_final
self.frames[:, :, 3] = gray_final
else:
self.frames[:, :, 3] = self.frames[:, :, 2]
self.frames[:, :, 2] = self.frames[:, :, 1]
self.frames[:, :, 1] = self.frames[:, :, 0]
self.frames[:, :, 0] = gray_final
# Next we increment the frame_num counter
self.frame_num += 1
if self.debug:
cv2.imshow('Game', gray_final)
return self.frames.copy()
经过预处理后,我们的游戏屏幕如下图所示:
现在,我们构建决斗 DQN; 我们先构建三个卷积层,然后是两个全连接层,最后一个全连接层将被分为两个单独的层,用于值流和优势流。 我们将使用将值流和优势流结合在一起的聚合层来计算 q 值。 这些层的大小如下:
- 第 1 层:32 个
8x8
过滤器,步幅为 4 + RELU - 第 2 层:64 个
4x4
过滤器,步幅为 2 + RELU - 第 3 层:64 个
3x3
过滤器,步幅为 1 + RELU - 第 4a 层:512 个单元的全连接层 + RELU
- 第 4b 层:512 个单元的全连接层 + RELU
- 第 5a 层:1 个 FC + RELU(状态值)
- 第 5b 层:动作 FC + RELU(优势值)
- 第 6 层:总计
V(s) + A(s, a)
class QNetworkDueling(QNetwork):
我们定义__init__
方法来初始化所有层:
def __init__(self, input_size, output_size, name):
self.name = name
self.input_size = input_size
self.output_size = output_size
with tf.variable_scope(self.name):
# Three convolutional Layers
self.W_conv1 = self.weight_variable([8, 8, 4, 32])
self.B_conv1 = self.bias_variable([32])
self.stride1 = 4
self.W_conv2 = self.weight_variable([4, 4, 32, 64])
self.B_conv2 = self.bias_variable([64])
self.stride2 = 2
self.W_conv3 = self.weight_variable([3, 3, 64, 64])
self.B_conv3 = self.bias_variable([64])
self.stride3 = 1
# Two fully connected layer
self.W_fc4a = self.weight_variable([7`7`64, 512])
self.B_fc4a = self.bias_variable([512])
self.W_fc4b = self.weight_variable([7`7`64, 512])
self.B_fc4b = self.bias_variable([512])
# Value stream
self.W_fc5a = self.weight_variable([512, 1])
self.B_fc5a = self.bias_variable([1])
# Advantage stream
self.W_fc5b = self.weight_variable([512, self.output_size])
self.B_fc5b = self.bias_variable([self.output_size])
我们定义__call__
方法并执行卷积运算:
def __call__(self, input_tensor):
if type(input_tensor) == list:
input_tensor = tf.concat(1, input_tensor)
with tf.variable_scope(self.name):
# Perform convolutional on three layers
self.h_conv1 = tf.nn.relu( tf.nn.conv2d(input_tensor, self.W_conv1, strides=[1, self.stride1, self.stride1, 1], padding='VALID') + self.B_conv1 )
self.h_conv2 = tf.nn.relu( tf.nn.conv2d(self.h_conv1, self.W_conv2, strides=[1, self.stride2, self.stride2, 1], padding='VALID') + self.B_conv2 )
self.h_conv3 = tf.nn.relu( tf.nn.conv2d(self.h_conv2, self.W_conv3, strides=[1, self.stride3, self.stride3, 1], padding='VALID') + self.B_conv3 )
# Flatten the convolutional output
self.h_conv3_flat = tf.reshape(self.h_conv3, [-1, 7`7`64])
# Fully connected layer
self.h_fc4a = tf.nn.relu(tf.matmul(self.h_conv3_flat, self.W_fc4a) + self.B_fc4a)
self.h_fc4b = tf.nn.relu(tf.matmul(self.h_conv3_flat, self.W_fc4b) + self.B_fc4b)
# Compute value stream and advantage stream
self.h_fc5a_value = tf.identity(tf.matmul(self.h_fc4a, self.W_fc5a) + self.B_fc5a)
self.h_fc5b_advantage = tf.identity(tf.matmul(self.h_fc4b, self.W_fc5b) + self.B_fc5b)
# Club both the value and advantage stream
self.h_fc6 = self.h_fc5a_value + ( self.h_fc5b_advantage - tf.reduce_mean(self.h_fc5b_advantage, reduction_indices=[1,], keep_dims=True) )
return self.h_fc6
现在,我们构建经验回放缓冲区,该缓冲区用于存储所有智能体的经验。 我们从回放缓冲区中抽取了少量经验来训练网络:
class ReplayMemoryFast:
首先,我们定义__init__
方法并启动缓冲区大小:
def __init__(self, memory_size, minibatch_size):
# max number of samples to store
self.memory_size = memory_size
# minibatch size
self.minibatch_size = minibatch_size
self.experience = [None]*self.memory_size
self.current_index = 0
self.size = 0
接下来,我们定义store
函数来存储经验:
def store(self, observation, action, reward, newobservation, is_terminal):
将经验存储为元组(当前状态action
,reward
,下一个状态是最终状态):
self.experience[self.current_index] = (observation, action, reward, newobservation, is_terminal)
self.current_index += 1
self.size = min(self.size+1, self.memory_size)
如果索引大于内存,那么我们通过减去内存大小来刷新索引:
if self.current_index >= self.memory_size:
self.current_index -= self.memory_size
接下来,我们定义一个sample
函数,用于对小批量经验进行采样:
def sample(self):
if self.size < self.minibatch_size:
return []
# First we randomly sample some indices
samples_index = np.floor(np.random.random((self.minibatch_size,))*self.size)
# select the experience from the sampled indexed
samples = [self.experience[int(i)] for i in samples_index]
return samples
现在,我们将看到如何训练网络。
首先,我们定义DQN
类并在__init__
方法中初始化所有变量:
class DQN(object):
def __init__(self, state_size,
action_size,
session,
summary_writer = None,
exploration_period = 1000,
minibatch_size = 32,
discount_factor = 0.99,
experience_replay_buffer = 10000,
target_qnet_update_frequency = 10000,
initial_exploration_epsilon = 1.0,
final_exploration_epsilon = 0.05,
reward_clipping = -1,
):
初始化所有变量:
self.state_size = state_size
self.action_size = action_size
self.session = session
self.exploration_period = float(exploration_period)
self.minibatch_size = minibatch_size
self.discount_factor = tf.constant(discount_factor)
self.experience_replay_buffer = experience_replay_buffer
self.summary_writer = summary_writer
self.reward_clipping = reward_clipping
self.target_qnet_update_frequency = target_qnet_update_frequency
self.initial_exploration_epsilon = initial_exploration_epsilon
self.final_exploration_epsilon = final_exploration_epsilon
self.num_training_steps = 0
通过为我们的QNetworkDueling
类创建实例来初始化主要决斗 DQN:
self.qnet = QNetworkDueling(self.state_size, self.action_size, "qnet")
同样,初始化目标决斗 DQN:
self.target_qnet = QNetworkDueling(self.state_size, self.action_size, "target_qnet")
接下来,将优化器初始化为RMSPropOptimizer
:
self.qnet_optimizer = tf.train.RMSPropOptimizer(learning_rate=0.00025, decay=0.99, epsilon=0.01)
现在,通过为我们的ReplayMemoryFast
类创建实例来初始化experience_replay_buffer
:
self.experience_replay = ReplayMemoryFast(self.experience_replay_buffer, self.minibatch_size)
# Setup the computational graph
self.create_graph()
接下来,我们定义copy_to_target_network
函数,用于将权重从主网络复制到目标网络:
def copy_to_target_network(source_network, target_network):
target_network_update = []
for v_source, v_target in zip(source_network.variables(), target_network.variables()):
# update target network
update_op = v_target.assign(v_source)
target_network_update.append(update_op)
return tf.group(*target_network_update)
现在,我们定义create_graph
函数并构建我们的计算图:
def create_graph(self):
我们计算q_values
并选择具有最大q
值的动作:
with tf.name_scope("pick_action"):
# placeholder for state
self.state = tf.placeholder(tf.float32, (None,)+self.state_size , name="state")
# placeholder for q values
self.q_values = tf.identity(self.qnet(self.state) , name="q_values")
# placeholder for predicted actions
self.predicted_actions = tf.argmax(self.q_values, dimension=1 , name="predicted_actions")
# plot histogram to track max q values
tf.histogram_summary("Q values", tf.reduce_mean(tf.reduce_max(self.q_values, 1))) # save max q-values to track learning
接下来,我们计算目标未来奖励:
with tf.name_scope("estimating_future_rewards"):
self.next_state = tf.placeholder(tf.float32, (None,)+self.state_size , name="next_state")
self.next_state_mask = tf.placeholder(tf.float32, (None,) , name="next_state_mask")
self.rewards = tf.placeholder(tf.float32, (None,) , name="rewards")
self.next_q_values_targetqnet = tf.stop_gradient(self.target_qnet(self.next_state), name="next_q_values_targetqnet")
self.next_q_values_qnet = tf.stop_gradient(self.qnet(self.next_state), name="next_q_values_qnet")
self.next_selected_actions = tf.argmax(self.next_q_values_qnet, dimension=1)
self.next_selected_actions_onehot = tf.one_hot(indices=self.next_selected_actions, depth=self.action_size)
self.next_max_q_values = tf.stop_gradient( tf.reduce_sum( tf.mul( self.next_q_values_targetqnet, self.next_selected_actions_onehot ) , reduction_indices=[1,] ) * self.next_state_mask )
self.target_q_values = self.rewards + self.discount_factor*self.next_max_q_values
接下来,我们使用 RMSProp 优化器执行优化:
with tf.name_scope("optimization_step"):
self.action_mask = tf.placeholder(tf.float32, (None, self.action_size) , name="action_mask")
self.y = tf.reduce_sum( self.q_values * self.action_mask , reduction_indices=[1,])
## ERROR CLIPPING
self.error = tf.abs(self.y - self.target_q_values)
quadratic_part = tf.clip_by_value(self.error, 0.0, 1.0)
linear_part = self.error - quadratic_part
self.loss = tf.reduce_mean( 0.5*tf.square(quadratic_part) + linear_part )
# optimize the gradients
qnet_gradients = self.qnet_optimizer.compute_gradients(self.loss, self.qnet.variables())
for i, (grad, var) in enumerate(qnet_gradients):
if grad is not None:
qnet_gradients[i] = (tf.clip_by_norm(grad, 10), var)
self.qnet_optimize = self.qnet_optimizer.apply_gradients(qnet_gradients)
将主要网络权重复制到目标网络:
with tf.name_scope("target_network_update"):
self.hard_copy_to_target = DQN.copy_to_target_network(self.qnet, self.target_qnet)
我们定义了store
函数,用于将所有经验存储在experience_replay_buffer
中:
def store(self, state, action, reward, next_state, is_terminal):
# rewards clipping
if self.reward_clipping > 0.0:
reward = np.clip(reward, -self.reward_clipping, self.reward_clipping)
self.experience_replay.store(state, action, reward, next_state, is_terminal)
我们定义了一个action
函数,用于使用衰减的ε
贪婪策略选择动作:
def action(self, state, training = False):
if self.num_training_steps > self.exploration_period:
epsilon = self.final_exploration_epsilon
else:
epsilon = self.initial_exploration_epsilon - float(self.num_training_steps) * (self.initial_exploration_epsilon - self.final_exploration_epsilon) / self.exploration_period
if not training:
epsilon = 0.05
if random.random() <= epsilon:
action = random.randint(0, self.action_size-1)
else:
action = self.session.run(self.predicted_actions, {self.state:[state] } )[0]
return action
现在,我们定义一个train
函数来训练我们的网络:
def train(self):
将主要网络权重复制到目标网络:
if self.num_training_steps == 0:
print "Training starts..."
self.qnet.copy_to(self.target_qnet)
记忆回放中的示例经验:
minibatch = self.experience_replay.sample()
从minibatch
获取状态,动作,奖励和下一个状态:
batch_states = np.asarray( [d[0] for d in minibatch] )
actions = [d[1] for d in minibatch]
batch_actions = np.zeros( (self.minibatch_size, self.action_size) )
for i in xrange(self.minibatch_size):
batch_actions[i, actions[i]] = 1
batch_rewards = np.asarray( [d[2] for d in minibatch] )
batch_newstates = np.asarray( [d[3] for d in minibatch] )
batch_newstates_mask = np.asarray( [not d[4] for d in minibatch] )
执行训练操作:
scores, _, = self.session.run([self.q_values, self.qnet_optimize],
{ self.state: batch_states,
self.next_state: batch_newstates,
self.next_state_mask: batch_newstates_mask,
self.rewards: batch_rewards,
self.action_mask: batch_actions} )
更新目标网络权重:
if self.num_training_steps % self.target_qnet_update_frequency == 0:
self.session.run( self.hard_copy_to_target )
print 'mean maxQ in minibatch: ',np.mean(np.max(scores,1))
str_ = self.session.run(self.summarize, { self.state: batch_states,
self.next_state: batch_newstates,
self.next_state_mask: batch_newstates_mask,
self.rewards: batch_rewards,
self.action_mask: batch_actions})
self.summary_writer.add_summary(str_, self.num_training_steps)
self.num_training_steps += 1
到目前为止,我们已经看到了如何构建决斗 DQN。 现在,我们将看到在玩赛车游戏时如何利用我们的决斗 DQN。
首先,让我们导入必要的库:
import gym
import time
import logging
import os
import sys
import tensorflow as tf
初始化所有必需的变量:
ENV_NAME = 'Seaquest-v0'
TOTAL_FRAMES = 20000000
MAX_TRAINING_STEPS = 20 * 60 * 60 / 3
TESTING_GAMES = 30
MAX_TESTING_STEPS = 5 * 60 * 60 / 3
TRAIN_AFTER_FRAMES = 50000
epoch_size = 50000
MAX_NOOP_START = 30
LOG_DIR = 'logs'
outdir = 'results'
logger = tf.train.SummaryWriter(LOG_DIR)
# Intialize tensorflow session
session = tf.InteractiveSession()
构建智能体:
agent = DQN(state_size=env.observation_space.shape,
action_size=env.action_space.n,
session=session,
summary_writer = logger,
exploration_period = 1000000,
minibatch_size = 32,
discount_factor = 0.99,
experience_replay_buffer = 1000000,
target_qnet_update_frequency = 20000,
initial_exploration_epsilon = 1.0,
final_exploration_epsilon = 0.1,
reward_clipping = 1.0,
)
session.run(tf.initialize_all_variables())
logger.add_graph(session.graph)
saver = tf.train.Saver(tf.all_variables())
存储录音:
env.monitor.start(outdir+'/'+ENV_NAME,force = True, video_callable=multiples_video_schedule)
num_frames = 0
num_games = 0
current_game_frames = 0
init_no_ops = np.random.randint(MAX_NOOP_START+1)
last_time = time.time()
last_frame_count = 0.0
state = env.reset()
现在,让我们开始训练:
while num_frames <= TOTAL_FRAMES+1:
if test_mode:
env.render()
num_frames += 1
current_game_frames += 1
给定当前状态,选择操作:
action = agent.action(state, training = True)
在环境上执行操作,接收reward
,然后移至next_state
:
next_state,reward,done,_ = env.step(action)
将此转移信息存储在experience_replay_buffer
中:
if current_game_frames >= init_no_ops:
agent.store(state,action,reward,next_state,done)
state = next_state
训练智能体:
if num_frames>=TRAIN_AFTER_FRAMES:
agent.train()
if done or current_game_frames > MAX_TRAINING_STEPS:
state = env.reset()
current_game_frames = 0
num_games += 1
init_no_ops = np.random.randint(MAX_NOOP_START+1)
在每个周期之后保存网络参数:
if num_frames % epoch_size == 0 and num_frames > TRAIN_AFTER_FRAMES:
saver.save(session, outdir+"/"+ENV_NAME+"/model_"+str(num_frames/1000)+"k.ckpt")
print "epoch: frames=",num_frames," games=",num_games
我们每两个周期测试一次表现:
if num_frames % (2*epoch_size) == 0 and num_frames > TRAIN_AFTER_FRAMES:
total_reward = 0
avg_steps = 0
for i in xrange(TESTING_GAMES):
state = env.reset()
init_no_ops = np.random.randint(MAX_NOOP_START+1)
frm = 0
while frm < MAX_TESTING_STEPS:
frm += 1
env.render()
action = agent.action(state, training = False)
if current_game_frames < init_no_ops:
action = 0
state,reward,done,_ = env.step(action)
total_reward += reward
if done:
break
avg_steps += frm
avg_reward = float(total_reward)/TESTING_GAMES
str_ = session.run( tf.scalar_summary('test reward ('+str(epoch_size/1000)+'k)', avg_reward) )
logger.add_summary(str_, num_frames)
state = env.reset()
env.monitor.close()
我们可以看到智能体如何学习赢得赛车游戏,如以下屏幕截图所示:
在本章中,我们学习了如何详细实现决斗 DQN。 我们从用于游戏画面预处理的基本环境包装器函数开始,然后定义了QNetworkDueling
类。 在这里,我们实现了决斗 Q 网络,该网络将 DQN 的最终全连接层分为值流和优势流,然后将这两个流组合以计算q
值。 之后,我们看到了如何创建回放缓冲区,该缓冲区用于存储经验并为网络训练提供经验的小批量样本,最后,我们使用 OpenAI 的 Gym 初始化了赛车环境并训练了我们的智能体。 在下一章第 13 章,“最新进展和后续步骤”中,我们将看到 RL 的一些最新进展。
问题列表如下:
- DQN 和决斗 DQN 有什么区别?
- 编写用于回放缓冲区的 Python 代码。
- 什么是目标网络?
- 编写 Python 代码以获取优先级的经验回放缓冲区。
- 创建一个 Python 函数来衰减
ε
贪婪策略。 - 决斗 DQN 与双 DQN 有何不同?
- 创建用于将主要网络权重更新为目标网络的 Python 函数。
以下链接将帮助您扩展知识: