Skip to content

Commit

Permalink
Merge pull request #158 from pescadores/2.1-release
Browse files Browse the repository at this point in the history
2.1 release
  • Loading branch information
cjacoby authored Aug 29, 2019
2 parents 7d14790 + f78230a commit cb7e8a4
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 67 deletions.
14 changes: 14 additions & 0 deletions docs/changes.rst
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
Changes
=======

v2.1.0 (2019-08-27)
-------------------
- `#156`_ Added support for python 3.7
- `#153`_ Added the `cache` map
- `#151`_ Added the decorator interface for defining streamers
- `#150`_ `Streamer` objects now directly support iterator inputs
- `#150`_ Removed deprecated `Mux` class, which has been replaced by
`StochastixMux`.

.. _#156: https://github.com/pescadores/pescador/pull/156
.. _#153: https://github.com/pescadores/pescador/pull/153
.. _#151: https://github.com/pescadores/pescador/pull/151
.. _#150: https://github.com/pescadores/pescador/pull/150

v2.0.2 (2019-07-10)
-------------------
- `#145`_ Fixed `ShuffledMux` to support integer-typed weights
Expand Down
22 changes: 22 additions & 0 deletions docs/example1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,25 @@ Iterating over any of these would then look like the following:
for sample in streamer:
# do something
...
Stream decorators
-----------------
In the example above, we implemented a streamer by first defining a generator function, and then wrapping it inside a `Streamer` object.
This allows the function to be used either directly as an iterator, or indirectly through pescador.
However, if you are implementing a generator which will only be used via pescador, and do not need direct iterator access, you can accomplish
this with a slightly cleaner syntax like in the following example:

.. code-block:: python
:linenos:
import pescador
@pescador.streamable
def my_generator(n):
for i in range(n):
yield i**2
Using the above syntax, when you call `my_generator(5)`, the result is itself a `Streamer` object, rather than an iterator.
We generally recommend using this syntax unless direct access to the iterator is necessary, as the resulting code tends to be more readable.
Further examples are given elsewhere in the documentation.
9 changes: 8 additions & 1 deletion docs/example2.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ The `StochasticMux` streamer provides a powerful interface for randomly interlea

As a concrete example, we can simulate a mixture of noisy streams with differing variances.

In this example, we use a slightly different notation for defining streamers, introduced in pescador 2.1.
Instead of defining an iterator function `noisy_samples` and then wrapping it with a `Streamer` object,
we can instead decorate the generator function with `@pescador.streamable`.
The result is qualitatively the same in both cases, but the decorator interface produces a slightly
cleaner syntax.

.. code-block:: python
:linenos:
Expand All @@ -26,6 +32,7 @@ As a concrete example, we can simulate a mixture of noisy streams with differing
from pescador import Streamer, StochasticMux
@pescador.streamable
def noisy_samples(X, Y, sigma=1.0):
'''Copied over from the previous example'''
n, d = X.shape
Expand All @@ -50,7 +57,7 @@ As a concrete example, we can simulate a mixture of noisy streams with differing
estimator = SGDClassifier()
# Build a collection of Streamers with different noise scales
streams = [Streamer(noisy_samples, X[train], Y[train], sigma=sigma)
streams = [noisy_samples(X[train], Y[train], sigma=sigma)
for sigma in [0, 0.5, 1.0, 2.0, 4.0]]
# Build a mux stream, keeping 3 streams alive at once
Expand Down
63 changes: 33 additions & 30 deletions docs/example3.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ We will define infinite samplers that pull `n` examples per iterate.
import numpy as np
import pescador
@pescador.streamable
def sample_npz(npz_file, n):
'''Generate an infinite sequence of contiguous samples
from the input `npz_file`.
Expand All @@ -40,7 +41,7 @@ Applying the `sample_npz` function above to a list of `npz_files`, we can make a
n = 16
npz_files = #LIST OF PRE-COMPUTED NPZ FILES
streams = [pescador.Streamer(sample_npz, npz_f, n) for npz_f in npz_files]
streams = [sample_npz(npz_f, n) for npz_f in npz_files]
# Keep 32 streams alive at once
# Draw on average 16 patches from each stream before deactivating
Expand All @@ -58,46 +59,48 @@ The `NPZ` file format requires loading the entire contents of each archive into

.. code-block:: python
def sample_npz_copy(npz_file, n):
with np.load(npz_file) as data:
# How many rows are in the data?
# We assume that data['Y'] has the same length
n_total = len(data['X'])
@pescador.streamable
def sample_npz_copy(npz_file, n):
with np.load(npz_file) as data:
# How many rows are in the data?
# We assume that data['Y'] has the same length
n_total = len(data['X'])
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=data['X'][idx:idx + n].copy(), # <-- Note the explicit copy
Y=data['Y'][idx:idx + n].copy())
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=data['X'][idx:idx + n].copy(), # <-- Note the explicit copy
Y=data['Y'][idx:idx + n].copy())
The above modification will ensure that memory is freed as quickly as possible.

Alternatively, *memory-mapping* can be used to only load data as needed, but requires that each array is stored in its own `NPY` file:

.. code-block:: python
def sample_npy_mmap(npy_x, npy_y, n):
@pescador.streamable
def sample_npy_mmap(npy_x, npy_y, n):
# Open each file in "copy-on-write" mode, so that the files are read-only
X = np.load(npy_x, mmap_mode='c')
Y = np.load(npy_y, mmap_mode='c')
# Open each file in "copy-on-write" mode, so that the files are read-only
X = np.load(npy_x, mmap_mode='c')
Y = np.load(npy_y, mmap_mode='c')
n_total = len(X)
n_total = len(X)
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=X[idx:idx + n],
Y=Y[idx:idx + n])
while True:
# Compute the index offset
idx = np.random.randint(n_total - n)
yield dict(X=X[idx:idx + n],
Y=Y[idx:idx + n])
# Using this streamer is similar to the first example, but now you need a separate
# NPY file for each X and Y
npy_x_files = #LIST OF PRE-COMPUTED NPY FILES (X)
npy_y_files = #LIST OF PRE-COMPUTED NPY FILES (Y)
streams = [pescador.Streamer(sample_npy_mmap, npy_x, npy_y, n)
for (npy_x, npy_y) in zip(npy_x_files, npy_y_files)]
# Using this streamer is similar to the first example, but now you need a separate
# NPY file for each X and Y
npy_x_files = #LIST OF PRE-COMPUTED NPY FILES (X)
npy_y_files = #LIST OF PRE-COMPUTED NPY FILES (Y)
streams = [sample_npy_mmap(npy_x, npy_y, n)
for (npy_x, npy_y) in zip(npy_x_files, npy_y_files)]
# Then construct the `StochasticMux` from the streams, as above
mux_streamer = pescador.StochasticMux(streams, n_active=32, rate=16)
# Then construct the `StochasticMux` from the streams, as above
mux_streamer = pescador.StochasticMux(streams, n_active=32, rate=16)
...
...
3 changes: 2 additions & 1 deletion docs/why.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ In this case, you would most likely implement a `generator` for each file as fol
.. code-block:: python
:linenos:
@pescador.streamable
def sample_file(filename):
# Load observation data
X = np.load(filename)
Expand All @@ -60,7 +61,7 @@ In this case, you would most likely implement a `generator` for each file as fol
# Generate a random row as a dictionary
yield dict(X=X[np.random.choice(len(X))])
streamers = [pescador.Streamer(sample_file, fname) for fname in ALL_30K_FILES]
streamers = [sample_file(fname) for fname in ALL_30K_FILES]
# Keep 100 streamers active at a time
# Replace a streamer after it has generated (on average) 8 samples
Expand Down
6 changes: 4 additions & 2 deletions examples/frameworks/keras_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def build_model(input_shape):
# Define Data Sampler
##############################################

@pescador.streamable
def sampler(X, y):
'''A basic generator for sampling data.
Expand Down Expand Up @@ -152,6 +153,7 @@ def sampler(X, y):
# Define a Custom Map Function
##############################################

@pescador.streamable
def additive_noise(stream, key='X', scale=1e-1):
'''Add noise to a data stream.
Expand Down Expand Up @@ -187,8 +189,8 @@ def additive_noise(stream, key='X', scale=1e-1):
# Create two streams from the same data, where one of the streams
# adds a small amount of Gaussian noise. You could easily perform
# other data augmentations using the same 'map' strategy.
stream = pescador.Streamer(sampler, X_train, Y_train)
noisy_stream = pescador.Streamer(additive_noise, stream, 'X')
stream = sampler(X_train, Y_train)
noisy_stream = additive_noise(stream, 'X')

# Multiplex the two streamers together.
mux = pescador.StochasticMux([stream, noisy_stream],
Expand Down
5 changes: 3 additions & 2 deletions examples/mux/chain_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
# containing inputs and outputs (eg, images and class labels)
# Once the streamer produces m examples, it exits.

@pescador.streamable
def data_gen(filename, m):

data = np.load(filename)
Expand All @@ -56,9 +57,9 @@ def data_gen(filename, m):
# Constructing the streamers
############################
# First, we'll make a streamer for each validation example.
#
#

val_streams = [pescador.Streamer(data_gen, fn, M) for fn in val_files]
val_streams = [data_gen(fn, M) for fn in val_files]


############################
Expand Down
22 changes: 16 additions & 6 deletions examples/mux/epoch.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
all data is touched once per epoch, then the `StochasticMux` can be
used in `cycle` mode to restart all streamers once they've been
exhausted.
"""

# Imports
Expand All @@ -47,7 +46,12 @@
# The npz file is assumed to store two arrays: X and Y
# containing inputs and outputs.
# Once the streamer produces m examples, it exits.
#
# Here, we'll use the decorator interface to declare this
# generator as a pescador Streamer


@pescador.streamable
def data_gen(filename, m):
data = np.load(filename)
X = data['X']
Expand All @@ -60,32 +64,37 @@ def data_gen(filename, m):
###############################
# We'll make a streamer for each source file

streams = [pescador.Streamer(data_gen, fn, M) for fn in files]
streams = [data_gen(fn, M) for fn in files]

###############################
# Epochs with StochasticMux
###############################
# The `StochasticMux` has three modes of operation, which control
# how its input streams are activated and replaced:
#
# - `mode='with_replacement'` allows each streamer to be activated
# multiple times, even simultaneously.
# multiple times, even simultaneously.
#
# - `mode='single_active'` does not allow a streamer to be active
# more than once at a time, but an inactive streamer can be activated
# at any time.
# more than once at a time, but an inactive streamer can be activated
# at any time.
#
# - `mode='exhaustive'` is like `single_active`, but does not allow
# previously used streamers to be re-activated.
# previously used streamers to be re-activated.
#
# For epoch-based sampling, we will use `exhaustive` mode to ensure
# that streamers are not reactivated within the epoch.
#
# Since each data stream produces exactly `M` examples, this would lead
# to a finite sample stream (i.e., only one epoch).
# To prevent the mux from exiting after the first epoch, we'll use `cycle` mode.
#

k = 100 # or however many streamers you want simultaneously active

# We'll use `rate=None` here so that the number of samples per stream is
# determined by the streamer (`M`) and not the mux.

mux = pescador.StochasticMux(streams, k, rate=None, mode='exhaustive')

epoch_stream = mux(cycle=True)
Expand All @@ -100,3 +109,4 @@ def data_gen(filename, m):
# `fit_generator` method, you need to be able to explicitly calculate
# the duration of an epoch, which means that the number of samples
# per streamer (`M` here) must be known in advance.
#
29 changes: 15 additions & 14 deletions examples/mux/mux_files_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def split_and_save_datasets(X, Y, paths):
#################################################


@pescador.streamable
def npz_generator(npz_path):
"""Generate data from an npz file."""
npz_data = np.load(npz_path)
Expand All @@ -71,7 +72,7 @@ def npz_generator(npz_path):
yield {'X': X[i], 'Y': y[i]}


streams = [pescador.Streamer(npz_generator, x) for x in datasets]
streams = [npz_generator(x) for x in datasets]


##############################################
Expand All @@ -80,13 +81,13 @@ def npz_generator(npz_path):
# If you can easily fit all the datasets in memory and you want to
# sample from then equally, you would set up your Mux as follows:

mux = pescador.Mux(streams,
# Three streams, always active.
k=len(streams),
# We want to sample from each stream infinitely,
# so we turn off the rate parameter, which
# controls how long to sample from each stream.
rate=None)
mux = pescador.StochasticMux(streams,
# Three streams, always active.
n_active=len(streams),
# We want to sample from each stream infinitely,
# so we turn off the rate parameter, which
# controls how long to sample from each stream.
rate=None)


##############################################
Expand All @@ -96,12 +97,12 @@ def npz_generator(npz_path):
# Now, the rate parameter controls (statistically) how long to sample
# from a stream before activating a new stream.

mux = pescador.Mux(streams,
# Only allow one active stream
k=1,
# Sample on average 1000 samples from a stream before
# moving onto another one.
rate=1000)
mux = pescador.StochasticMux(streams,
# Only allow one active stream
n_active=1,
# Sample on average 1000 samples from a stream before
# moving onto another one.
rate=1000)


##############################################
Expand Down
5 changes: 3 additions & 2 deletions examples/mux/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@

# First, let's make a simple generator that makes an infinite
# sequence of a given letter.
@pescador.streamable
def letter(c):
while True:
yield c

# Let's make the two populations of streamers
pop1 = [pescador.Streamer(letter, c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ']
pop2 = [pescador.Streamer(letter, c) for c in 'abcdefghijklmnopqrstuvwxyz']
pop1 = [letter(c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ']
pop2 = [letter(c) for c in 'abcdefghijklmnopqrstuvwxyz']

# We'll sample population 1 with 3 streamers active at any time.
# Each streamer will generate, on average, 5 samples before being
Expand Down
Loading

0 comments on commit cb7e8a4

Please sign in to comment.