-
Notifications
You must be signed in to change notification settings - Fork 1
/
misc.py
136 lines (104 loc) · 4.42 KB
/
misc.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
# -*- coding: utf-8 -*-
"""
Created on Mon Oct 18 10:43:43 2021
@author: Nick_SimPC
"""
import numpy as np
from functools import wraps
import inspect
import torch as th
class MeritOrder():
def __init__(self, res_load, powerplants, snapshots):
self.snapshots = snapshots
self.res_load = list(res_load.values)
self.powerplants = powerplants.copy()
def calculate_merit_order(self, res_load, t):
powerplants = sorted(self.powerplants, key = lambda x, t = t: x.marginal_cost[t])
contractedSupply = 0
if res_load <= 0:
mcp = 0
else:
for pp in powerplants:
contractedSupply += pp.maxPower
mcp = pp.marginal_cost[t]
if contractedSupply >= res_load:
break
else:
mcp = 100
return mcp
def price_forward_curve(self):
pfc = []
for t in range(len(self.snapshots)):
pfc.append(self.calculate_merit_order(self.res_load[t], t))
return pfc
# from https://github.com/songrotek/DDPG/blob/master/ou_noise.py
class OUNoise():
def __init__(self, action_dimension, mu=0, sigma=0.5, theta = 0.15, dt = 1e-2):
self.action_dimension = action_dimension
self.mu = mu
self.theta = theta
self.sigma = sigma
self.dt = dt
self.noise_prev = np.zeros(self.action_dimension)
self.reset()
def reset(self):
self.noise_prev = self.initial_noise if self.initial_noise is not None else np.zeros(self.action_dimension)
def noise(self):
noise = (self.noise_prev + self.theta * (self.mu - self.noise_prev) * self.dt
+ self.sigma * np.sqrt(self.dt) * np.random.normal(size=self.action_dimension))
self.noise_prev = noise
return noise
class NormalActionNoise():
def __init__(self, action_dimension, mu=0., sigma=0.1, scale = 1.0, dt = 0.9998):
self.act_dimension = action_dimension
self.mu = mu
self.sigma = sigma
self.scale = scale
self.dt = dt
def noise(self):
noise = self.scale * np.random.normal(self.mu, self.sigma, self.act_dimension)
self.scale = self.dt * self.scale #if self.scale >= 0.1 else self.scale
return noise
def reset(self):
self.scale = 1.0
def initializer(func):
"""
Automatically assigns the parameters.
>>> class process:
... @initializer
... def __init__(self, cmd, reachable=False, user='root'):
... pass
>>> p = process('halt', True)
>>> p.cmd, p.reachable, p.user
('halt', True, 'root')
"""
names, varargs, keywords, defaults = inspect.getargspec(func)
@wraps(func)
def wrapper(self, *args, **kargs):
for name, arg in list(zip(names[1:], args)) + list(kargs.items()):
setattr(self, name, arg)
for name, default in zip(reversed(names), reversed(defaults)):
if not hasattr(self, name):
setattr(self, name, default)
func(self, *args, **kargs)
return wrapper
def polyak_update(params,target_params,tau):
"""
Perform a Polyak average update on ``target_params`` using ``params``:
target parameters are slowly updated towards the main parameters.
``tau``, the soft update coefficient controls the interpolation:
``tau=1`` corresponds to copying the parameters to the target ones whereas nothing happens when ``tau=0``.
The Polyak update is done in place, with ``no_grad``, and therefore does not create intermediate tensors,
or a computation graph, reducing memory cost and improving performance. We scale the target params
by ``1-tau`` (in-place), add the new weights, scaled by ``tau`` and store the result of the sum in the target
params (in place).
See https://github.com/DLR-RM/stable-baselines3/issues/93
:param params: parameters to use to update the target params
:param target_params: parameters to update
:param tau: the soft update coefficient ("Polyak update", between 0 and 1)
"""
with th.no_grad():
# zip does not raise an exception if length of parameters does not match.
for param, target_param in zip(params, target_params):
target_param.data.mul_(1 - tau)
th.add(target_param.data, param.data, alpha=tau, out=target_param.data)