Skip to content

Commit

Permalink
Version 1.0.1
Browse files Browse the repository at this point in the history
  • Loading branch information
onnoeberhard committed Dec 2, 2022
1 parent 107a6c5 commit e766240
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 18 deletions.
86 changes: 72 additions & 14 deletions pink/cnrl.py
Original file line number Diff line number Diff line change
@@ -1,42 +1,100 @@
from . import colorednoise as cn

class ColoredNoiseProcess():
def __init__(self, beta, scale=1, chunksize=None, largest_wavelength=None, rng=None):
"""Colored noise implemented as a process that allows subsequent samples.
Implemented as a buffer; every "chunksize[-1]" items, a cut to a new time series starts.
"""Infinite colored noise process.
Implemented as a buffer: every `size[-1]` samples, a cut to a new time series starts. As this cut influences the PSD of the combined signal, the maximum period (1 / low-frequency cutoff) can be specified.
Methods
-------
sample()
Sample a single timestep from the colored nosie process.
"""
def __init__(self, beta, size, scale=1, max_period=None, rng=None):
"""Infinite colored noise process.
Implemented as a buffer: every `size[-1]` samples, a cut to a new time series starts. As this cut influences the PSD of the combined signal, the maximum period (1 / low-frequency cutoff) can be specified.
Parameters
----------
beta : float
Exponent of colored noise power-law spectrum.
size : int or tuple of int
Shape of the sampled colored noise signals. The last dimension (`size[-1]`) specifies the time range, and is thus ths maximum possible correlation length of the combined signal.
scale : int, optional, by default 1
Scale parameter with which samples are multiplied
max_period : float, optional, by default None
Maximum correlation length of sampled colored noise singals (1 / low-frequency cutoff). If None, it is
automatically set to `size[-1]` (the sequence length).
rng : np.random.Generator, optional, by default None
Random number generator (for reproducibility). If None, a new random number generator is created by calling
`np.random.default_rng()`.
"""
self.beta = beta
if largest_wavelength is None:
if max_period is None:
self.minimum_frequency = 0
else:
self.minimum_frequency = 1 / largest_wavelength
self.minimum_frequency = 1 / max_period
self.scale = scale
self.rng = rng

# The last component of chunksize is the time index
# The last component of size is the time index
try:
self.chunksize = list(chunksize)
self.size = list(size)
except TypeError:
self.chunksize = [chunksize]
self.time_steps = self.chunksize[-1]
self.size = [size]
self.time_steps = self.size[-1]

# Set first time-step such that buffer will be initialized
self.idx = self.time_steps

def sample(self):
"""
Sample a single timestep from the colored nosie process.
The buffer is automatically refilled when necessary.
Returns
-------
array_like
Sampled vector of shape `size[:-1]`
"""
self.idx += 1 # Next time step

# Refill buffer if depleted
if self.idx >= self.time_steps:
self.buffer = cn.powerlaw_psd_gaussian(
exponent=self.beta, size=self.chunksize, fmin=self.minimum_frequency, rng=self.rng)
exponent=self.beta, size=self.size, fmin=self.minimum_frequency, rng=self.rng)
self.idx = 0

return self.scale * self.buffer[..., self.idx]

class PinkNoiseProcess(ColoredNoiseProcess):
def __init__(self, scale=1, chunksize=None, largest_wavelength=None, rng=None):
"""Colored noise implemented as a process that allows subsequent samples.
Implemented as a buffer; every "chunksize[-1]" items, a cut to a new time series starts.
"""Infinite pink noise process.
Implemented as a buffer: every `size[-1]` samples, a cut to a new time series starts. As this cut influences the PSD of the combined signal, the maximum period (1 / low-frequency cutoff) can be specified.
Methods
-------
sample()
Sample a single timestep from the pink nosie process.
"""
def __init__(self, size, scale=1, max_period=None, rng=None):
"""Infinite pink noise process.
Implemented as a buffer: every `size[-1]` samples, a cut to a new time series starts. As this cut influences the PSD of the combined signal, the maximum period (1 / low-frequency cutoff) can be specified.
Parameters
----------
size : int or tuple of int
Shape of the sampled pink noise signals. The last dimension (`size[-1]`) specifies the time range, and is thus ths maximum possible correlation length of the combined signal.
scale : int, optional, by default 1
Scale parameter with which samples are multiplied
max_period : float, optional, by default None
Maximum correlation length of sampled pink noise singals (1 / low-frequency cutoff). If None, it is
automatically set to `size[-1]` (the sequence length).
rng : np.random.Generator, optional, by default None
Random number generator (for reproducibility). If None, a new random number generator is created by calling
`np.random.default_rng()`.
"""
super().__init__(1, scale, chunksize, largest_wavelength, rng)
super().__init__(1, size, scale, max_period, rng)
4 changes: 2 additions & 2 deletions pink/sb3.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def __init__(self, beta, sigma, seq_len, rng=None):
super().__init__()
self._beta = beta
self._sigma = sigma
self._gen = [ColoredNoiseProcess(beta=b, scale=s, chunksize=seq_len, largest_wavelength=None, rng=rng)
self._gen = [ColoredNoiseProcess(beta=b, scale=s, size=seq_len, rng=rng)
for b, s in zip(beta, sigma)]

def __call__(self) -> np.ndarray:
Expand Down Expand Up @@ -85,7 +85,7 @@ class (`SquashedDiagGaussianDistribution`).
A small value to avoid NaN due to numerical imprecision.
"""
super().__init__(len(beta), epsilon)
self.cn_processes = [ColoredNoiseProcess(beta=b, chunksize=seq_len, largest_wavelength=None, rng=rng)
self.cn_processes = [ColoredNoiseProcess(beta=b, size=seq_len, rng=rng)
for b in beta]

def sample(self) -> th.Tensor:
Expand Down
2 changes: 1 addition & 1 deletion pink/tonic.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def set_beta(self, beta):
if np.isscalar(beta):
beta = [beta] * self.action_space.shape[0]
self.cn_processes = [
ColoredNoiseProcess(beta=b, chunksize=self.seq_len, largest_wavelength=None, rng=self.rng) for b in beta]
ColoredNoiseProcess(beta=b, size=self.seq_len, rng=self.rng) for b in beta]

def _step(self, observations):
observations = th.as_tensor(observations, dtype=th.float32)
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pink-noise-rl"
version = "1.0.0"
version = "1.0.1"
description = "Pink noise for exploration in reinforcement learning"
authors = ["Onno Eberhard <[email protected]>"]
license = "MIT"
Expand Down

0 comments on commit e766240

Please sign in to comment.