Skip to content

Commit

Permalink
modernized example code where appropriate
Browse files Browse the repository at this point in the history
  • Loading branch information
bmcfee committed Aug 28, 2019
1 parent d1e0eb0 commit f78230a
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 25 deletions.
22 changes: 22 additions & 0 deletions docs/example1.rst
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,25 @@ Iterating over any of these would then look like the following:
for sample in streamer:
# do something
...
Stream decorators
-----------------
In the example above, we implemented a streamer by first defining a generator function, and then wrapping it inside a `Streamer` object.
This allows the function to be used either directly as an iterator, or indirectly through pescador.
However, if you are implementing a generator which will only be used via pescador, and do not need direct iterator access, you can accomplish
this with a slightly cleaner syntax like in the following example:

.. code-block:: python
:linenos:
import pescador
@pescador.streamable
def my_generator(n):
for i in range(n):
yield i**2
Using the above syntax, when you call `my_generator(5)`, the result is itself a `Streamer` object, rather than an iterator.
We generally recommend using this syntax unless direct access to the iterator is necessary, as the resulting code tends to be more readable.
Further examples are given elsewhere in the documentation.
3 changes: 2 additions & 1 deletion docs/why.rst
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ In this case, you would most likely implement a `generator` for each file as fol
.. code-block:: python
:linenos:
@pescador.streamable
def sample_file(filename):
# Load observation data
X = np.load(filename)
Expand All @@ -60,7 +61,7 @@ In this case, you would most likely implement a `generator` for each file as fol
# Generate a random row as a dictionary
yield dict(X=X[np.random.choice(len(X))])
streamers = [pescador.Streamer(sample_file, fname) for fname in ALL_30K_FILES]
streamers = [sample_file(fname) for fname in ALL_30K_FILES]
# Keep 100 streamers active at a time
# Replace a streamer after it has generated (on average) 8 samples
Expand Down
6 changes: 4 additions & 2 deletions examples/frameworks/keras_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ def build_model(input_shape):
# Define Data Sampler
##############################################

@pescador.streamable
def sampler(X, y):
'''A basic generator for sampling data.
Expand Down Expand Up @@ -152,6 +153,7 @@ def sampler(X, y):
# Define a Custom Map Function
##############################################

@pescador.streamable
def additive_noise(stream, key='X', scale=1e-1):
'''Add noise to a data stream.
Expand Down Expand Up @@ -187,8 +189,8 @@ def additive_noise(stream, key='X', scale=1e-1):
# Create two streams from the same data, where one of the streams
# adds a small amount of Gaussian noise. You could easily perform
# other data augmentations using the same 'map' strategy.
stream = pescador.Streamer(sampler, X_train, Y_train)
noisy_stream = pescador.Streamer(additive_noise, stream, 'X')
stream = sampler(X_train, Y_train)
noisy_stream = additive_noise(stream, 'X')

# Multiplex the two streamers together.
mux = pescador.StochasticMux([stream, noisy_stream],
Expand Down
5 changes: 3 additions & 2 deletions examples/mux/chain_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
# containing inputs and outputs (eg, images and class labels)
# Once the streamer produces m examples, it exits.

@pescador.streamable
def data_gen(filename, m):

data = np.load(filename)
Expand All @@ -56,9 +57,9 @@ def data_gen(filename, m):
# Constructing the streamers
############################
# First, we'll make a streamer for each validation example.
#
#

val_streams = [pescador.Streamer(data_gen, fn, M) for fn in val_files]
val_streams = [data_gen(fn, M) for fn in val_files]


############################
Expand Down
29 changes: 15 additions & 14 deletions examples/mux/mux_files_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def split_and_save_datasets(X, Y, paths):
#################################################


@pescador.streamable
def npz_generator(npz_path):
"""Generate data from an npz file."""
npz_data = np.load(npz_path)
Expand All @@ -71,7 +72,7 @@ def npz_generator(npz_path):
yield {'X': X[i], 'Y': y[i]}


streams = [pescador.Streamer(npz_generator, x) for x in datasets]
streams = [npz_generator(x) for x in datasets]


##############################################
Expand All @@ -80,13 +81,13 @@ def npz_generator(npz_path):
# If you can easily fit all the datasets in memory and you want to
# sample from then equally, you would set up your Mux as follows:

mux = pescador.Mux(streams,
# Three streams, always active.
k=len(streams),
# We want to sample from each stream infinitely,
# so we turn off the rate parameter, which
# controls how long to sample from each stream.
rate=None)
mux = pescador.StochasticMux(streams,
# Three streams, always active.
n_active=len(streams),
# We want to sample from each stream infinitely,
# so we turn off the rate parameter, which
# controls how long to sample from each stream.
rate=None)


##############################################
Expand All @@ -96,12 +97,12 @@ def npz_generator(npz_path):
# Now, the rate parameter controls (statistically) how long to sample
# from a stream before activating a new stream.

mux = pescador.Mux(streams,
# Only allow one active stream
k=1,
# Sample on average 1000 samples from a stream before
# moving onto another one.
rate=1000)
mux = pescador.StochasticMux(streams,
# Only allow one active stream
n_active=1,
# Sample on average 1000 samples from a stream before
# moving onto another one.
rate=1000)


##############################################
Expand Down
5 changes: 3 additions & 2 deletions examples/mux/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@

# First, let's make a simple generator that makes an infinite
# sequence of a given letter.
@pescador.streamable
def letter(c):
while True:
yield c

# Let's make the two populations of streamers
pop1 = [pescador.Streamer(letter, c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ']
pop2 = [pescador.Streamer(letter, c) for c in 'abcdefghijklmnopqrstuvwxyz']
pop1 = [letter(c) for c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ']
pop2 = [letter(c) for c in 'abcdefghijklmnopqrstuvwxyz']

# We'll sample population 1 with 3 streamers active at any time.
# Each streamer will generate, on average, 5 samples before being
Expand Down
8 changes: 4 additions & 4 deletions examples/zmq_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def costly_function(X, n_ops=100):
# some simple data. We'll run some computation on the inside to
# slow things down a bit.

@pescador.streamable
def data_gen(n_ops=100):
"""Yield data, while optionally burning compute cycles.
Expand Down Expand Up @@ -71,8 +72,7 @@ def timed_sampling(stream, n_iter, desc):
max_iter = 1e2

# Construct a streamer
stream = pescador.Streamer(data_gen)
timed_sampling(stream, max_iter, 'Single-threaded')
timed_sampling(data_gen, max_iter, 'Single-threaded')
# Single-threaded :: Average time per iteration: 0.024 sec


Expand All @@ -84,7 +84,7 @@ def timed_sampling(stream, n_iter, desc):
# do with the batches you receive here.

# Wrap the streamer in a ZMQ streamer
zstream = pescador.ZMQStreamer(stream)
zstream = pescador.ZMQStreamer(data_gen)
timed_sampling(zstream, max_iter, 'ZMQ')
# ZMQ :: Average time per iteration: 0.012 sec

Expand All @@ -98,7 +98,7 @@ def timed_sampling(stream, n_iter, desc):
buffer_size = 16

# Get batches from the stream as you would normally.
batches = pescador.Streamer(pescador.buffer_stream, stream, buffer_size)
batches = pescador.Streamer(pescador.buffer_stream, data_gen, buffer_size)
timed_sampling(batches, max_iter, 'Single-threaded Batches')
# Single-threaded Batches :: Average time per iteration: 0.392 sec

Expand Down

0 comments on commit f78230a

Please sign in to comment.