-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutils.py
432 lines (352 loc) · 16.5 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
"""
Utilities module containing helper functions for the Deep Q-Learning - Lunar Lander
Jupyter notebook (C3_W3_A1_Assignment) from DeepLearning.AI's "Unsupervised Learning,
Recommenders, Reinforcement Learning" course on Coursera.
"""
import base64
import random
from itertools import zip_longest
import imageio
import IPython
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
import numpy as np
import pandas as pd
import tensorflow as tf
SEED = 0 # Seed for the pseudo-random number generator.
MINIBATCH_SIZE = 64 # Mini-batch size.
TAU = 1e-3 # Soft update parameter.
E_DECAY = 0.995 # ε-decay rate for the ε-greedy policy.
E_MIN = 0.01 # Minimum ε value for the ε-greedy policy.
random.seed(SEED)
def get_experiences(memory_buffer):
"""
Returns a random sample of experience tuples drawn from the memory buffer.
Retrieves a random sample of experience tuples from the given memory_buffer and
returns them as TensorFlow Tensors. The size of the random sample is determined by
the mini-batch size (MINIBATCH_SIZE).
Args:
memory_buffer (deque):
A deque containing experiences. The experiences are stored in the memory
buffer as namedtuples: namedtuple("Experience", field_names=["state",
"action", "reward", "next_state", "done"]).
Returns:
A tuple (states, actions, rewards, next_states, done_vals) where:
- states are the starting states of the agent.
- actions are the actions taken by the agent from the starting states.
- rewards are the rewards received by the agent after taking the actions.
- next_states are the new states of the agent after taking the actions.
- done_vals are the boolean values indicating if the episode ended.
All tuple elements are TensorFlow Tensors whose shape is determined by the
mini-batch size and the given Gym environment. For the Lunar Lander environment
the states and next_states will have a shape of [MINIBATCH_SIZE, 8] while the
actions, rewards, and done_vals will have a shape of [MINIBATCH_SIZE]. All
TensorFlow Tensors have elements with dtype=tf.float32.
"""
experiences = random.sample(memory_buffer, k=MINIBATCH_SIZE)
states = tf.convert_to_tensor(
np.array([e.state for e in experiences if e is not None]), dtype=tf.float32
)
actions = tf.convert_to_tensor(
np.array([e.action for e in experiences if e is not None]), dtype=tf.float32
)
rewards = tf.convert_to_tensor(
np.array([e.reward for e in experiences if e is not None]), dtype=tf.float32
)
next_states = tf.convert_to_tensor(
np.array([e.next_state for e in experiences if e is not None]), dtype=tf.float32
)
done_vals = tf.convert_to_tensor(
np.array([e.done for e in experiences if e is not None]).astype(np.uint8),
dtype=tf.float32,
)
return (states, actions, rewards, next_states, done_vals)
def check_update_conditions(t, num_steps_upd, memory_buffer):
"""
Determines if the conditions are met to perform a learning update.
Checks if the current time step t is a multiple of num_steps_upd and if the
memory_buffer has enough experience tuples to fill a mini-batch (for example, if the
mini-batch size is 64, then the memory buffer should have more than 64 experience
tuples in order to perform a learning update).
Args:
t (int):
The current time step.
num_steps_upd (int):
The number of time steps used to determine how often to perform a learning
update. A learning update is only performed every num_steps_upd time steps.
memory_buffer (deque):
A deque containing experiences. The experiences are stored in the memory
buffer as namedtuples: namedtuple("Experience", field_names=["state",
"action", "reward", "next_state", "done"]).
Returns:
A boolean that will be True if conditions are met and False otherwise.
"""
if (t + 1) % num_steps_upd == 0 and len(memory_buffer) > MINIBATCH_SIZE:
return True
else:
return False
def get_new_eps(epsilon):
"""
Updates the epsilon value for the ε-greedy policy.
Gradually decreases the value of epsilon towards a minimum value (E_MIN) using the
given ε-decay rate (E_DECAY).
Args:
epsilon (float):
The current value of epsilon.
Returns:
A float with the updated value of epsilon.
"""
return max(E_MIN, E_DECAY * epsilon)
def get_action(q_values, epsilon=0.0):
"""
Returns an action using an ε-greedy policy.
This function will return an action according to the following rules:
- With probability epsilon, it will return an action chosen at random.
- With probability (1 - epsilon), it will return the action that yields the
maximum Q value in q_values.
Args:
q_values (tf.Tensor):
The Q values returned by the Q-Network. For the Lunar Lander environment
this TensorFlow Tensor should have a shape of [1, 4] and its elements should
have dtype=tf.float32.
epsilon (float):
The current value of epsilon.
Returns:
An action (numpy.int64). For the Lunar Lander environment, actions are
represented by integers in the closed interval [0,3].
"""
if random.random() > epsilon:
return np.argmax(q_values.numpy()[0])
else:
return random.choice(np.arange(4))
def update_target_network(q_network, target_q_network):
"""
Updates the weights of the target Q-Network using a soft update.
The weights of the target_q_network are updated using the soft update rule:
w_target = (TAU * w) + (1 - TAU) * w_target
where w_target are the weights of the target_q_network, TAU is the soft update
parameter, and w are the weights of the q_network.
Args:
q_network (tf.keras.Sequential):
The Q-Network.
target_q_network (tf.keras.Sequential):
The Target Q-Network.
"""
for target_weights, q_net_weights in zip(
target_q_network.weights, q_network.weights
):
target_weights.assign(TAU * q_net_weights + (1.0 - TAU) * target_weights)
def plot_history(point_history, **kwargs):
"""
Plots the total number of points received by the agent after each episode together
with the moving average (rolling mean).
Args:
point_history (list):
A list containing the total number of points the agent received after each
episode.
**kwargs: optional
window_size (int):
Size of the window used to calculate the moving average (rolling mean).
This integer determines the fixed number of data points used for each
window. The default window size is set to 10% of the total number of
data points in point_history, i.e. if point_history has 200 data points
the default window size will be 20.
lower_limit (int):
The lower limit of the x-axis in data coordinates. Default value is 0.
upper_limit (int):
The upper limit of the x-axis in data coordinates. Default value is
len(point_history).
plot_rolling_mean_only (bool):
If True, only plots the moving average (rolling mean) without the point
history. Default value is False.
plot_data_only (bool):
If True, only plots the point history without the moving average.
Default value is False.
"""
lower_limit = 0
upper_limit = len(point_history)
window_size = (upper_limit * 10) // 100
plot_rolling_mean_only = False
plot_data_only = False
if kwargs:
if "window_size" in kwargs:
window_size = kwargs["window_size"]
if "lower_limit" in kwargs:
lower_limit = kwargs["lower_limit"]
if "upper_limit" in kwargs:
upper_limit = kwargs["upper_limit"]
if "plot_rolling_mean_only" in kwargs:
plot_rolling_mean_only = kwargs["plot_rolling_mean_only"]
if "plot_data_only" in kwargs:
plot_data_only = kwargs["plot_data_only"]
points = point_history[lower_limit:upper_limit]
# Generate x-axis for plotting.
episode_num = [x for x in range(lower_limit, upper_limit)]
# Use Pandas to calculate the rolling mean (moving average).
rolling_mean = pd.DataFrame(points).rolling(window_size).mean()
plt.figure(figsize=(10, 7), facecolor="white")
if plot_data_only:
plt.plot(episode_num, points, linewidth=1, color="cyan")
elif plot_rolling_mean_only:
plt.plot(episode_num, rolling_mean, linewidth=2, color="magenta")
else:
plt.plot(episode_num, points, linewidth=1, color="cyan")
plt.plot(episode_num, rolling_mean, linewidth=2, color="magenta")
text_color = "black"
ax = plt.gca()
ax.set_facecolor("black")
plt.grid()
plt.xlabel("Episode", color=text_color, fontsize=30)
plt.ylabel("Total Points", color=text_color, fontsize=30)
yNumFmt = mticker.StrMethodFormatter("{x:,}")
ax.yaxis.set_major_formatter(yNumFmt)
ax.tick_params(axis="x", colors=text_color)
ax.tick_params(axis="y", colors=text_color)
plt.show()
def display_table(current_state, action, next_state, reward, done):
"""
Displays a table containing the current state, action, next state, reward, and done
values from Gym's Lunar Lander environment.
All floating point numbers in the table are displayed rounded to 3 decimal places
and actions are displayed using their labels instead of their numerical value (i.e
if action = 0, the action will be printed as "Do nothing" instead of "0").
Args:
current_state (numpy.ndarray):
The current state vector returned by the Lunar Lander environment
before an action is taken
action (int):
The action taken by the agent. In the Lunar Lander environment, actions are
represented by integers in the closed interval [0,3] corresponding to:
- Do nothing = 0
- Fire right engine = 1
- Fire main engine = 2
- Fire left engine = 3
next_state (numpy.ndarray):
The state vector returned by the Lunar Lander environment after the agent
takes an action, i.e the observation returned after running a single time
step of the environment's dynamics using env.step(action).
reward (numpy.float64):
The reward returned by the Lunar Lander environment after the agent takes an
action, i.e the reward returned after running a single time step of the
environment's dynamics using env.step(action).
done (bool):
The done value returned by the Lunar Lander environment after the agent
takes an action, i.e the done value returned after running a single time
step of the environment's dynamics using env.step(action).
Returns:
table (Pandas Dataframe):
A dataframe containing the current_state, action, next_state, reward,
and done values. This will result in the table being displayed in the
Jupyter Notebook.
"""
STATE_VECTOR_COL_NAME = 'State Vector'
DERIVED_COL_NAME = 'Derived from the State Vector (the closer to zero, the better)'
# States
add_derived_info = lambda state: np.hstack([
state,
[(state[0]**2 + state[1]**2)**.5],
[(state[2]**2 + state[3]**2)**.5],
[np.abs(state[4])]
])
modified_current_state = add_derived_info(current_state)
modified_next_state = add_derived_info(next_state)
states = np.vstack([
modified_current_state,
modified_next_state,
modified_next_state - modified_current_state,
]).T
get_state = lambda idx, type=np.float32: dict(zip(
['Current State', 'Next State'],
states[idx].astype(type)
))
# Actions
action_labels = [
"Do nothing",
"Fire right engine",
"Fire main engine",
"Fire left engine",
]
display(
pd.DataFrame({
('', '', ''): {'Action': action_labels[action], 'Reward': reward, 'Episode Terminated': done},
(STATE_VECTOR_COL_NAME, 'Coordinate', 'X (Horizontal)'): get_state(0),
(STATE_VECTOR_COL_NAME, 'Coordinate', 'Y (Vertical)'): get_state(1),
(STATE_VECTOR_COL_NAME, 'Velocity', 'X (Horizontal)'): get_state(2),
(STATE_VECTOR_COL_NAME, 'Velocity', 'Y (Vertical)'): get_state(3),
(STATE_VECTOR_COL_NAME, 'Tilting', 'Angle'): get_state(4),
(STATE_VECTOR_COL_NAME, 'Tilting', 'Angular Velocity'): get_state(5),
(STATE_VECTOR_COL_NAME, 'Ground contact', 'Left Leg?'): get_state(6, np.bool),
(STATE_VECTOR_COL_NAME, 'Ground contact', 'Right Leg?'): get_state(7, np.bool),
(DERIVED_COL_NAME, 'Distance from landing pad', ''): get_state(8),
(DERIVED_COL_NAME, 'Velocity', ''): get_state(9),
(DERIVED_COL_NAME, 'Tilting Angle (absolute value)', ''): get_state(10),
})\
.fillna('')\
.reindex(['Current State', 'Action', 'Next State', 'Reward', 'Episode Terminated'])\
.style\
.applymap(lambda x: 'background-color : grey' if x == '' else '')\
.set_table_styles(
[
{"selector": "th", "props": [("border", "1px solid grey"), ('text-align', 'center')]},
{"selector": "tbody td", "props": [("border", "1px solid grey"), ('text-align', 'center')]},
]
)
)
def embed_mp4(filename):
"""
Embeds an MP4 video file in a Jupyter notebook.
Args:
filename (string):
The path to the the MP4 video file that will be embedded (i.e.
"./videos/lunar_lander.mp4").
Returns:
Returns a display object from the given video file. This will result in the
video being displayed in the Jupyter Notebook.
"""
video = open(filename, "rb").read()
b64 = base64.b64encode(video)
tag = """
<video width="840" height="480" controls>
<source src="data:video/mp4;base64,{0}" type="video/mp4">
Your browser does not support the video tag.
</video>""".format(
b64.decode()
)
return IPython.display.HTML(tag)
def create_video(filename, env, q_network, fps=30):
"""
Creates a video of an agent interacting with a Gym environment.
The agent will interact with the given env environment using the q_network to map
states to Q values and using a greedy policy to choose its actions (i.e it will
choose the actions that yield the maximum Q values).
The video will be saved to a file with the given filename. The video format must be
specified in the filename by providing a file extension (.mp4, .gif, etc..). If you
want to embed the video in a Jupyter notebook using the embed_mp4 function, then the
video must be saved as an MP4 file.
Args:
filename (string):
The path to the file to which the video will be saved. The video format will
be selected based on the filename. Therefore, the video format must be
specified in the filename by providing a file extension (i.e.
"./videos/lunar_lander.mp4"). To see a list of supported formats see the
imageio documentation: https://imageio.readthedocs.io/en/v2.8.0/formats.html
env (Gym Environment):
The Gym environment the agent will interact with.
q_network (tf.keras.Sequential):
A TensorFlow Keras Sequential model that maps states to Q values.
fps (int):
The number of frames per second. Specifies the frame rate of the output
video. The default frame rate is 30 frames per second.
"""
with imageio.get_writer(filename, fps=fps) as video:
done = False
state = env.reset()
frame = env.render(mode="rgb_array")
video.append_data(frame)
while not done:
state = np.expand_dims(state, axis=0)
q_values = q_network(state)
action = np.argmax(q_values.numpy()[0])
state, _, done, _ = env.step(action)
frame = env.render(mode="rgb_array")
video.append_data(frame)