-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathkarpathy_pong_model.py
276 lines (226 loc) · 11.5 KB
/
karpathy_pong_model.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
# -*- coding: utf-8 -*-
"""Karpathy_Pong_model.ipynb
Automatically generated by Colaboratory.
Original file is located at
https://colab.research.google.com/drive/1RUXdAs-zFbA562KsukpWc_-3Voj_4u3k
"""
#https://github.com/dhruvp/atari-pong
## Architecture
# Take in inputs from the screen and preprocess them
# Pass them into an NN
# Update the weights of the NN using gradient descent
# weights['1'] - Matrix that holds weights of pixels passing into hidden layer. Dimensions: [200 x 80 x 80] -> [200 x 6400]
# weights['2'] - Matrix that holds weights of hidden layer passing into output. Dimensions: [1 x 200]
# Process is:
# processed_observations = image vector - [6400 x 1] array
# Compute hidden_layer_values = weights['1'] dot processed_observations ([200 x 6400] dot [6400 x 1]) -> [200 x 1] - this gives initial activation values.
# Next we need to transform those either via a sigmoid or an ReLU of some sort. Let's use ReLU
# ReLU(hidden_layer_values)
# Next we need to pass this one layer further
# output_layer_value = weights['2'] dot hidden_layer_values ([1 x 200] dot [200 x 1] -> [1 x 1])
# Now our output layer is the probability of going up or down. Let's make sure this output is between 0 and 1 by passing it through a sigmoid
# p = sigmoid(output_layer_value)
# Learning after round has finished:
# Figure out the result
# Compute the error
# Use the error to calculate the gradient
# The below dimensions all assume we had exactly 10 frames in the round (not necessarily true!)
# dC_dw2 = hidden_layer_values^T dot gradient_log_p ([1 x 2000] dot [2000 x 1] -> 1x1)
# delta_1 = gradient_log_p outer_product weights['2'] = [2000 x 1] outer_product [1 x 200] ([2000 x 200])
# dC_dw1 = delta_1^T dot input_observations ([200 x 2000]x dot [2000 x 64000] -> [200 x 64000])
# After some batch size of rounds has finished,
# Use rmsprop to move weights['1'] and weights['2'] in the direction of the gradient
# Repeat!
import gym
import numpy as np
import _pickle as pickle
import matplotlib.pyplot as plt
import os
def downsample(image):
# Take only alternate pixels - basically halves the resolution of the image (which is fine for us)
return image[::2, ::2, :]
def remove_color(image):
"""Convert all color (RGB is the third dimension in the image)"""
return image[:, :, 0]
def remove_background(image):
image[image == 144] = 0
image[image == 109] = 0
return image
def preprocess_observations(input_observation, prev_processed_observation, input_dimensions):
""" convert the 210x160x3 uint8 frame into a 6400 float vector """
processed_observation = input_observation[35:195] # crop
processed_observation = downsample(processed_observation)
processed_observation = remove_color(processed_observation)
processed_observation = remove_background(processed_observation)
processed_observation[processed_observation != 0] = 1 # everything else (paddles, ball) just set to 1
# Convert from 80 x 80 matrix to 1600 x 1 matrix
processed_observation = processed_observation.astype(np.float).ravel()
# subtract the previous frame from the current one so we are only processing on changes in the game
if prev_processed_observation is not None:
input_observation = processed_observation - prev_processed_observation
else:
input_observation = np.zeros(input_dimensions)
# store the previous frame so we can subtract from it next time
prev_processed_observations = processed_observation
return input_observation, prev_processed_observations
def sigmoid(x):
return 1.0/(1.0 + np.exp(-x))
def relu(vector):
vector[vector < 0] = 0
return vector
def apply_neural_nets(observation_matrix, weights):
""" Based on the observation_matrix and weights, compute the new hidden layer values and the new output layer values"""
hidden_layer_values = np.dot(weights['1'], observation_matrix)
hidden_layer_values = relu(hidden_layer_values)
output_layer_values = np.dot(hidden_layer_values, weights['2'])
output_layer_values = sigmoid(output_layer_values)
return hidden_layer_values, output_layer_values
def choose_action(probability):
random_value = np.random.uniform()
if random_value < probability:
# signifies up in openai gym
return 2
else:
# signifies down in openai gym
return 3
def compute_gradient(gradient_log_p, hidden_layer_values, observation_values, weights):
""" See here: http://neuralnetworksanddeeplearning.com/chap2.html"""
delta_L = gradient_log_p
dC_dw2 = np.dot(hidden_layer_values.T, delta_L).ravel()
delta_l2 = np.outer(delta_L, weights['2'])
delta_l2 = relu(delta_l2)
dC_dw1 = np.dot(delta_l2.T, observation_values)
return {
'1': dC_dw1,
'2': dC_dw2
}
def update_weights(weights, expectation_g_squared, g_dict, decay_rate, learning_rate):
""" See here: http://sebastianruder.com/optimizing-gradient-descent/index.html#rmsprop"""
epsilon = 1e-5
for layer_name in weights.keys():
g = g_dict[layer_name]
expectation_g_squared[layer_name] = decay_rate * expectation_g_squared[layer_name] + (1 - decay_rate) * g**2
weights[layer_name] += (learning_rate * g)/(np.sqrt(expectation_g_squared[layer_name] + epsilon))
g_dict[layer_name] = np.zeros_like(weights[layer_name]) # reset batch gradient buffer
def discount_rewards(rewards, gamma):
""" Actions you took 20 steps before the end result are less important to the overall result than an action you took a step ago.
This implements that logic by discounting the reward on previous actions based on how long ago they were taken"""
discounted_rewards = np.zeros_like(rewards)
running_add = 0
for t in reversed(range(0, rewards.size)):
if rewards[t] != 0:
running_add = 0 # reset the sum, since this was a game boundary (pong specific!)
running_add = running_add * gamma + rewards[t]
discounted_rewards[t] = running_add
return discounted_rewards
def discount_with_rewards(gradient_log_p, episode_rewards, gamma):
""" discount the gradient with the normalized rewards """
discounted_episode_rewards = discount_rewards(episode_rewards, gamma)
# standardize the rewards to be unit normal (helps control the gradient estimator variance)
discounted_episode_rewards -= np.mean(discounted_episode_rewards)
discounted_episode_rewards /= np.std(discounted_episode_rewards)
return gradient_log_p * discounted_episode_rewards
def main():
env = gym.make("Pong-v0")
resume = False #change it to true if you already trained the agent previously
train_episodes = 5000
#environment initialization
observation = env.reset() # This gets us the image
prev_processed_observations = None
# hyperparameters
batch_size = 10
gamma = 0.99 # discount factor for reward
decay_rate = 0.99 # the exploitation rate of the agent
num_hidden_layer_neurons = 200
input_dimensions = 80 * 80
learning_rate = 1e-4
#game parameter
episode_number = 0
reward_sum = 0
running_reward = None
episode_hidden_layer_values, episode_observations, episode_gradient_log_ps, episode_rewards = [], [], [], []
#model Initialization
if resume:
weights = pickle.load(open('save.p','rb'))
else:
weights = {
'1': np.random.randn(num_hidden_layer_neurons, input_dimensions) / np.sqrt(input_dimensions),
'2': np.random.randn(num_hidden_layer_neurons) / np.sqrt(num_hidden_layer_neurons)
}
# plotting
loss_buffer = []
reward_buffer = []
total_episodes_buffer = []
# To be used with RMSProp algorithm (http://sebastianruder.com/optimizing-gradient-descent/index.html#rmsprop)
expectation_g_squared = {}
g_dict = {}
for layer_name in weights.keys():
expectation_g_squared[layer_name] = np.zeros_like(weights[layer_name])
g_dict[layer_name] = np.zeros_like(weights[layer_name])
while episode_number < train_episodes:
#while True:
env.render() # comment this line if you dont want to see the agent play Pong in real time
processed_observations, prev_processed_observations = preprocess_observations(observation, prev_processed_observations, input_dimensions)
# forward propagation - model fit here
hidden_layer_values, up_probability = apply_neural_nets(processed_observations, weights)
episode_observations.append(processed_observations)
episode_hidden_layer_values.append(hidden_layer_values)
action = choose_action(up_probability)
# carry out the chosen action
observation, reward, done, info = env.step(action)
reward_sum += reward
episode_rewards.append(reward)
# see here: http://cs231n.github.io/neural-networks-2/#losses
fake_label = 1 if action == 2 else 0
loss_function_gradient = fake_label - up_probability
episode_gradient_log_ps.append(loss_function_gradient)
if done: # an episode finished
total_episodes_buffer.append(episode_number)
#log the loss of this episode
loss=np.average(episode_gradient_log_ps)
loss_buffer.append(loss)
# Combine the following values for the episode
episode_hidden_layer_values = np.vstack(episode_hidden_layer_values)
episode_observations = np.vstack(episode_observations)
episode_gradient_log_ps = np.vstack(episode_gradient_log_ps)
episode_rewards = np.vstack(episode_rewards)
# Tweak the gradient of the log_ps based on the discounted rewards
episode_gradient_log_ps_discounted = discount_with_rewards(episode_gradient_log_ps, episode_rewards, gamma)
# backproppagation
gradient = compute_gradient(
episode_gradient_log_ps_discounted,
episode_hidden_layer_values,
episode_observations,
weights
)
# Sum the gradient for use when we hit the batch size
for layer_name in gradient:
g_dict[layer_name] += gradient[layer_name]
#Applied RMSprop
if episode_number % batch_size == 0:
update_weights(weights, expectation_g_squared, g_dict, decay_rate, learning_rate)
episode_hidden_layer_values, episode_observations, episode_gradient_log_ps, episode_rewards = [], [], [], [] # reset values
observation = env.reset() # reset env
# simutaniously let the agnet explora the environment at .01 rate and exploit the environment at .99 rate
running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
reward_buffer.append(running_reward)
print('resetting env. episode %f. episode reward total was %f. running mean: %f' % (episode_number,reward_sum, running_reward))
print('--->episode loss %f.' % (loss))
# save the model weights every 100 episodes.
if episode_number % 100 == 0: pickle.dump(weights, open('save.p', 'wb'))
reward_sum = 0
prev_processed_observations = None
episode_number += 1
env.close() #if you run this on your local machine you need to close the env at the end
plt.figure(1)
plt.plot(total_episodes_buffer,loss_buffer)
plt.title('Model Loss over The number of Episodes')
plt.ylabel('Model Loss')
plt.xlabel('Episodes')
plt.figure(2)
plt.plot(total_episodes_buffer,reward_buffer)
plt.title('Rewards Earned over The number of Episodes')
plt.ylabel('Rewards')
plt.xlabel('Episode')
plt.show()
main()