-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrandom_walk_test.py
201 lines (164 loc) · 6.04 KB
/
random_walk_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#######################################################################
# Copyright (C) #
# 2016-2018 Shangtong Zhang([email protected]) #
# 2016 Kenta Shimada([email protected]) #
# Permission given to modify the code as long as you keep this #
# declaration at the top #
#######################################################################
# Modified to run one episode only - for testing purposes
import numpy as np
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
from tqdm import tqdm
# all states
N_STATES = 19
# discount
GAMMA = 1
# all states but terminal states
STATES = np.arange(1, N_STATES + 1)
# start from the middle state
START_STATE = 10
# two terminal states
# an action leading to the left terminal state has reward -1
# an action leading to the right terminal state has reward 1
END_STATES = [0, N_STATES + 1]
# true state value from bellman equation
TRUE_VALUE = np.arange(-20, 22, 2) / 20.0
TRUE_VALUE[0] = TRUE_VALUE[-1] = 0
# n-steps TD method
# @value: values for each state, will be updated
# @n: # of steps
# @alpha: # step size
def temporal_difference(value, n, alpha):
# initial starting state
state = START_STATE
# arrays to store states and rewards for an episode
# space isn't a major consideration, so I didn't use the mod trick
states = [state]
rewards = [0]
# track the time
time = 0
# the length of this episode
T = float('inf')
while True:
# go to next time step
time += 1
if time < T:
# choose an action randomly
if np.random.binomial(1, 0.5) == 1:
next_state = state + 1
else:
next_state = state - 1
if next_state == 0:
reward = -1
elif next_state == 20:
reward = 1
else:
reward = 0
# store new state and new reward
states.append(next_state)
rewards.append(reward)
if next_state in END_STATES:
T = time
# get the time of the state to update
update_time = time - n
if update_time >= 0:
returns = 0.0
# calculate corresponding rewards
for t in range(update_time + 1, min(T, update_time + n) + 1):
returns += pow(GAMMA, t - update_time - 1) * rewards[t]
# add state value to the return
if update_time + n <= T:
returns += pow(GAMMA, n) * value[states[(update_time + n)]]
state_to_update = states[update_time]
# update the state value
if not state_to_update in END_STATES:
value[state_to_update] += alpha * (returns - value[state_to_update])
if update_time == T - 1:
break
state = next_state
# Figure 7.2, it will take quite a while
def figure7_2():
# all possible steps
steps = np.power(2, np.arange(0, 10))
# all possible alphas
alphas = np.arange(0, 1.1, 0.1)
# each run has 10 episodes
episodes = 10
# perform 100 independent runs
runs = 100
# track the errors for each (step, alpha) combination
errors = np.zeros((len(steps), len(alphas)))
for run in tqdm(range(0, runs)):
for step_ind, step in zip(range(len(steps)), steps):
for alpha_ind, alpha in zip(range(len(alphas)), alphas):
# print('run:', run, 'step:', step, 'alpha:', alpha)
value = np.zeros(N_STATES + 2)
for ep in range(0, episodes):
temporal_difference(value, step, alpha)
# calculate the RMS error
errors[step_ind, alpha_ind] += np.sqrt(np.sum(np.power(value - TRUE_VALUE, 2)) / N_STATES)
# take average
errors /= episodes * runs
for i in range(0, len(steps)):
plt.plot(alphas, errors[i, :], label='n = %d' % (steps[i]))
plt.xlabel('alpha')
plt.ylabel('RMS error')
plt.ylim([0.25, 0.55])
plt.legend()
plt.savefig('../images/figure_7_2.png')
plt.close()
# n-steps TD method
# @value: values for each state, will be updated
# @n: # of steps
# @alpha: # step size
def temporal_difference_test(actions, value, n, alpha):
# initial starting state
state = START_STATE
# arrays to store states and rewards for an episode
# space isn't a major consideration, so I didn't use the mod trick
states = [state]
rewards = [0]
# track the time
time = 0
# the length of this episode
T = float('inf')
while True:
# go to next time step
time += 1
if time < T:
# take next action from list
action = actions[time]
assert action in [-1, 1], "Actions must be -1 or +1"
next_state = state + action
if next_state == 0:
reward = -1
elif next_state == 20:
reward = 1
else:
reward = 0
# store new state and new reward
states.append(next_state)
rewards.append(reward)
if next_state in END_STATES:
T = time
# get the time of the state to update
update_time = time - n
if update_time >= 0:
returns = 0.0
# calculate corresponding rewards
for t in range(update_time + 1, min(T, update_time + n) + 1):
returns += pow(GAMMA, t - update_time - 1) * rewards[t]
# add state value to the return
if update_time + n <= T:
returns += pow(GAMMA, n) * value[states[(update_time + n)]]
state_to_update = states[update_time]
# update the state value
if not state_to_update in END_STATES:
value[state_to_update] += alpha * (returns - value[state_to_update])
if update_time == T - 1:
break
state = next_state
if __name__ == '__main__':
figure7_2()