-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathale_experiment.py
executable file
·189 lines (149 loc) · 7.17 KB
/
ale_experiment.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
"""The ALEExperiment class handles the logic for training a deep
Q-learning agent in the Arcade Learning Environment.
Author: Nathan Sprague
"""
import logging
import numpy as np
import image_preprocessing
# Number of rows to crop off the bottom of the (downsampled) screen.
# This is appropriate for breakout, but it may need to be modified
# for other games.
CROP_OFFSET = 0# 8 # Pacman has also a black ribbon with the scores
class ALEExperiment(object):
def __init__(self, ale, agent, resized_width, resized_height,
resize_method, num_epochs, epoch_length, test_length,
frame_skip, death_ends_episode, max_start_nullops, rng):
self.ale = ale
self.agent = agent
self.num_epochs = num_epochs
self.epoch_length = epoch_length
self.test_length = test_length
self.frame_skip = frame_skip
self.death_ends_episode = death_ends_episode
self.min_action_set = ale.getMinimalActionSet()
self.resized_width = resized_width
self.resized_height = resized_height
self.resize_method = resize_method
self.width, self.height = ale.getScreenDims()
self.buffer_length = 2
self.buffer_count = 0
self.screen_buffer = np.empty((self.buffer_length,
self.height, self.width),
dtype=np.uint8)
self.terminal_lol = False # Most recent episode ended on a loss of life
self.max_start_nullops = max_start_nullops
self.rng = rng
def run(self):
"""
Run the desired number of training epochs, a testing epoch
is conducted after each training epoch.
"""
for epoch in range(1, self.num_epochs + 1):
self.run_epoch(epoch, self.epoch_length)
#(MINE) dont save q table every epoch but only at the last one (better create a separate function as finish_epoch writes stats as well
# if epoch==(self.num_epochs + 1):
self.agent.finish_epoch(epoch)
if self.test_length > 0:
self.agent.start_testing() # There is NO TESTING FUNCTION!!!
self.run_epoch(epoch, self.test_length, True)
self.agent.finish_testing(epoch)
def run_epoch(self, epoch, num_steps, testing=False):
""" Run one 'epoch' of training or testing, where an epoch is defined
by the number of steps executed. Prints a progress report after
every trial
Arguments:
epoch - the current epoch number
num_steps - steps per epoch
testing - True if this Epoch is used for testing and not training
"""
self.terminal_lol = False # Make sure each epoch starts with a reset.
steps_left = num_steps
while steps_left > 0:
prefix = "testing" if testing else "training"
logging.info(prefix + " epoch: " + str(epoch) + " steps_left: " +
str(steps_left))
_, num_steps = self.run_episode(steps_left, testing)
steps_left -= num_steps
def _init_episode(self):
""" This method resets the game if needed, performs enough null
actions to ensure that the screen buffer is ready and optionally
performs a randomly determined number of null action to randomize
the initial game state."""
if not self.terminal_lol or self.ale.game_over():
self.ale.reset_game()
if self.max_start_nullops > 0:
random_actions = self.rng.randint(0, self.max_start_nullops+1)
for _ in range(random_actions):
self._act(0) # Null action
# Make sure the screen buffer is filled at the beginning of
# each episode...
# MINE Its two actions because the buffer has 2 entries and then the image is summed and actions are repeated
self._act(0)
self._act(0)
def _act(self, action):
"""Perform the indicated action for a single frame, return the
resulting reward and store the resulting screen image in the
buffer
"""
reward = self.ale.act(action)
index = self.buffer_count % self.buffer_length
self.ale.getScreenGrayscale(self.screen_buffer[index, ...])
self.buffer_count += 1
return reward
def _step(self, action):
""" Repeat one action the appopriate number of times and return
the summed reward. (MINE) This is in order to skip frames!"""
reward = 0
for _ in range(self.frame_skip):
reward += self._act(action)
return reward
def run_episode(self, max_steps, testing):
"""Run a single training episode.
The boolean terminal value returned indicates whether the
episode ended because the game ended or the agent died (True)
or because the maximum number of steps was reached (False).
Currently this value will be ignored.
Return: (terminal, num_steps)
"""
self._init_episode()
start_lives = self.ale.lives()
#action = self.agent.start_episode(self.get_observation())
self.agent.start_episode(self.get_observation())
num_steps = 0
reward = 0 # MINE
while True: # You play till game over
action = self.agent.step(reward, self.get_observation())
reward = self._step(self.min_action_set[action])
self.agent.DM.respond(reward) # You might need to clip the reward!!!
self.terminal_lol = (self.death_ends_episode and not testing and
self.ale.lives() < start_lives)
terminal = self.ale.game_over() or self.terminal_lol
num_steps += 1
if terminal or num_steps >= max_steps:
self.agent.end_episode(reward, terminal)
break
#action = self.agent.step(reward, self.get_observation()) # The reward here is added in a self.episode_reward cumulative
return terminal, num_steps
def get_observation(self):
""" Resize and merge the previous two screen images """
assert self.buffer_count >= 2
index = self.buffer_count % self.buffer_length - 1
max_image = np.maximum(self.screen_buffer[index, ...],
self.screen_buffer[index - 1, ...])
return self.resize_image(max_image)
def resize_image(self, image):
""" Appropriately resize a single image """
if self.resize_method == 'crop':
# resize keeping aspect ratio
resize_height = int(round(
float(self.height) * self.resized_width / self.width))
resized = image_preprocessing.resize(image, (self.resized_width, resize_height))
# Crop the part we want
crop_y_cutoff = resize_height - CROP_OFFSET - self.resized_height
cropped = resized[crop_y_cutoff:
crop_y_cutoff + self.resized_height, :]
return cropped
elif self.resize_method == 'scale':
return image_preprocessing.resize(image, (self.resized_width, self.resized_height))
else:
raise ValueError('Unrecognized image resize method.')