Skip to content

Commit

Permalink
Version bump to 0.11.0.dev0, including deprecations (#837)
Browse files Browse the repository at this point in the history
  • Loading branch information
Saaketh Narayan authored Dec 3, 2024
1 parent fefab08 commit 93af28a
Show file tree
Hide file tree
Showing 19 changed files with 89 additions and 189 deletions.
12 changes: 4 additions & 8 deletions docs/source/dataset_configuration/shuffling.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ Shuffling is important for model convergence during training, but can be computa
| `shuffle` | `bool` | `False` | turn shuffling on or off |
| `shuffle_algo` | `str` | `'py1e'` | which shuffling algorithm to use |
| `shuffle_seed` | `int` | `9176` | all randomness in StreamingDataset is derived from this seed |
| `shuffle_block_size` | `int` | `max(4000000/num_canonical_nodes, 1<<18)` | Number of samples to shuffle at a time, only used by py1b, py1br, and py1e algorithms |
| `shuffle_block_size` | `int` | `max(4000000/num_canonical_nodes, 1<<18)` | Number of samples to shuffle at a time, only used by py1br and py1e algorithms |
| `num_canonical_nodes` | `int` | # of physical nodes | Number of sample buckets. Controls shuffling in py1s and py2s algorithms |

## How Shuffling Works
Expand All @@ -34,27 +34,23 @@ The `shuffle_algo` can be set to one of five choices, each with different tradeo

### Shuffle-block-based algorithms

If your dataset has not been pre-shuffled, or you are using multiple streams, you should use a shuffle-block-based algorithm. The `py1e`, `py1br`, and `py1b` shuffles use the `shuffle_block_size` parameter, which determines how many samples within each canonical node are shuffled at once. You should set `shuffle_block_size` to be larger than the number of samples in a single shard (usually, at least 10x) for a high quality shuffle.
If your dataset has not been pre-shuffled, or you are using multiple streams, you should use a shuffle-block-based algorithm. The `py1e` and `py1br`shuffles use the `shuffle_block_size` parameter, which determines how many samples within each canonical node are shuffled at once. You should set `shuffle_block_size` to be larger than the number of samples in a single shard (usually, at least 10x) for a high quality shuffle.

#### `'py1e'` (default)

Samples from each shard are spread out across a range of maximum size `shuffle_block_size`. The diagram below shows how the samples from each shard are spread out over a specified range, shuffling them.

<img src="../_static/images/py1e.png" alt="py1e shuffle" width="800"/>

This algorithm provides great shuffle quality, just like `py1br` and `py1b`, while also reducing the maximum needed cache limit and better balancing shard downloads. StreamingDataset defaults to using this shuffling algorithm.
This algorithm provides great shuffle quality, just like `py1br`, while also reducing the maximum needed cache limit and better balancing shard downloads. StreamingDataset defaults to using this shuffling algorithm.

#### `'py1br'`

Samples within each canonical node are shuffled in blocks of size `shuffle_block_size`. The block sizes are slightly randomized. The diagram below shows the boundaries of each shuffle block as a dashed line.

<img src="../_static/images/py1b_py1br.png" alt="py1br shuffle" width="800"/>

This algorithm is a more download-optimal version of `py1b`, which is being deprecated.

#### `'py1b'`

This algorithm is very similar to `py1br`, without randomizing shuffle block sizes, resulting in suboptimal download performance. It will soon be deprecated -- please use `py1e` or `py1br` instead.
This algorithm is a more download-optimal version of the now-deprecated `py1b`.

### Intra-shard shuffle algorithms

Expand Down
4 changes: 2 additions & 2 deletions docs/source/distributed_training/performance_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ $$L = 2 \cdot S \cdot \lceil\frac{C}{P}\rceil $$

Where $L$ is the required minimum cache limit per node, in MB, $S$ is the average shard size, in MB, $C$ is the number of canonical nodes (see [here](../dataset_configuration/shuffling.md#how-shuffling-works) and [here](../distributed_training/elastic_determinism.md#requirements)), and $P$ is the number of physical nodes. This is because only a single shard, plus a potentially predownloaded subsequent shard, needs to be resident per canonical node to make progress during training.

If using a shuffle-block-based algorithm such as [`'py1e'`](../dataset_configuration/shuffling.md#py1e-default), [`'py1br'`](../dataset_configuration/shuffling.md#py1br), or [`'py1b'`](../dataset_configuration/shuffling.md#py1b), the required minumum cache limit per node will be approximately:
If using a shuffle-block-based algorithm such as [`'py1e'`](../dataset_configuration/shuffling.md#py1e-default) or [`'py1br'`](../dataset_configuration/shuffling.md#py1br), the required minumum cache limit per node will be approximately:

$$L = k \cdot S \lceil \frac{B}{Q} \rceil \cdot \lceil\frac{C}{P}\rceil $$

Where $L$ is the required minimum cache limit per node, in MB, $k$ is a constant that depends on the shuffle algorithm used, $S$ is the average shard size, in MB, $B$ is the shuffle block size (see [here](../dataset_configuration/shuffling.md#shuffle-block-based-algorithms)) as a number of samples, $Q$ is the average number of samples per shard, $C$ is the number of canonical nodes (sample buckets), and $P$ is the number of physical nodes. This is because each shuffle block consists of $\lceil \frac{B}{Q}\rceil$ shards, and the subsequent shuffle block's shards may have to be predownloaded. The constant $k$ is $1$ for the [`'py1e'`](../dataset_configuration/shuffling.md#py1e-default) algorithm, whereas it is $2$ for both [`'py1br'`](../dataset_configuration/shuffling.md#py1br) and [`'py1b'`](../dataset_configuration/shuffling.md#py1b), meaning that `'py1e'` gives better cache limit performance, while retaining shuffle quality.
Where $L$ is the required minimum cache limit per node, in MB, $k$ is a constant that depends on the shuffle algorithm used, $S$ is the average shard size, in MB, $B$ is the shuffle block size (see [here](../dataset_configuration/shuffling.md#shuffle-block-based-algorithms)) as a number of samples, $Q$ is the average number of samples per shard, $C$ is the number of canonical nodes (sample buckets), and $P$ is the number of physical nodes. This is because each shuffle block consists of $\lceil \frac{B}{Q}\rceil$ shards, and the subsequent shuffle block's shards may have to be predownloaded. The constant $k$ is $1$ for the [`'py1e'`](../dataset_configuration/shuffling.md#py1e-default) algorithm, whereas it is $2$ for [`'py1br'`](../dataset_configuration/shuffling.md#py1br), meaning that `'py1e'` gives better cache limit performance, while retaining shuffle quality.

## Streaming Simulator

Expand Down
2 changes: 1 addition & 1 deletion docs/source/getting_started/faqs_and_tips.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ StreamingDataset is a subclass of PyTorch's IterableDataset, so applying transfo
You can set the per-node cache limit using StreamingDataset's `cache_limit` argument, detailed [here](../dataset_configuration/shard_retrieval.md#cache-limit). When shard usage hits the `cache_limit` Streaming will begin evicting shards.

### I'm seeing loss spikes and divergence on my training runs. How do I fix this?
Training loss may suffer from loss spikes or divergence for a variety of reasons. Higher quality shuffling and dataset mixing can help mitigate loss variance, divergence, and spikes. First, make sure that `shuffle` is set to `True` in your dataset. If you're already shuffling, you should make your shuffle strength higher. If using a shuffle-block-based shuffling algorithm like [`'py1e'`](../dataset_configuration/shuffling.md#py1e-default), [`'py1br'`](../dataset_configuration/shuffling.md#py1br), or [`'py1b'`](../dataset_configuration/shuffling.md#py1b), increase the `shuffle_block_size` parameter. If using an intra-shard shuffle such as [`'py1s'`](../dataset_configuration/shuffling.md#py1s) or [`'py2s'`](../dataset_configuration/shuffling.md#py2s), increase the `num_canonical_nodes` parameter. Read more about shuffling [here](../dataset_configuration/shuffling.md).
Training loss may suffer from loss spikes or divergence for a variety of reasons. Higher quality shuffling and dataset mixing can help mitigate loss variance, divergence, and spikes. First, make sure that `shuffle` is set to `True` in your dataset. If you're already shuffling, you should make your shuffle strength higher. If using a shuffle-block-based shuffling algorithm like [`'py1e'`](../dataset_configuration/shuffling.md#py1e-default) or [`'py1br'`](../dataset_configuration/shuffling.md#py1br), increase the `shuffle_block_size` parameter. If using an intra-shard shuffle such as [`'py1s'`](../dataset_configuration/shuffling.md#py1s) or [`'py2s'`](../dataset_configuration/shuffling.md#py2s), increase the `num_canonical_nodes` parameter. Read more about shuffling [here](../dataset_configuration/shuffling.md).

Changing how datasets are mixed can also help with training stability. Specifically, setting `batching_method` to `stratified` when mixing datasets provides consistent dataset mixing in every batch. Read more about dataset mixing [here](../dataset_configuration/mixing_data_sources.md).

Expand Down
2 changes: 1 addition & 1 deletion regression/iterate_data.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ command: |-
rm -rf /tmp/local_dataset/
composer -n 8 regression/iterate_data.py --remote /tmp/streaming_dataset_gz/ --local /tmp/local_dataset/ \
--shuffle_algo py1b --shuffle_seed 12 --shuffle_block_size 10000
--shuffle_algo py1e --shuffle_seed 12 --shuffle_block_size 10000
rm -rf /tmp/local_dataset/
image: mosaicml/composer:latest
Expand Down
4 changes: 2 additions & 2 deletions scripts/epoch/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ def parse_args() -> Namespace:
args.add_argument('-s',
'--shuffle_algo',
type=str,
default='py1b',
help='Shuffling algorithm (naive, py1b, py1s, py2s)')
default='py1e',
help='Shuffling algorithm (naive, py1e, py1br, py1s, py2s)')
args.add_argument('-z',
'--shuffle_block_size',
type=int,
Expand Down
8 changes: 4 additions & 4 deletions scripts/shuffle/bench.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@
import numpy as np
from numpy.typing import NDArray

from streaming.base.shuffle import (get_shuffle_naive, get_shuffle_py1b, get_shuffle_py1s,
get_shuffle_py2s)
from streaming.base.shuffle import (get_shuffle_naive, get_shuffle_py1br, get_shuffle_py1e,
get_shuffle_py1s, get_shuffle_py2s)


def parse_args() -> Namespace:
Expand Down Expand Up @@ -140,8 +140,8 @@ def main(args: Namespace) -> None:
Args:
args (Namespace): Command-line arguments.
"""
names = 'naive', 'py2s', 'py1s', 'py1b'
get_shuffles = get_shuffle_naive, get_shuffle_py2s, get_shuffle_py1s, get_shuffle_py1b
names = 'naive', 'py1e', 'py1br', 'py2s', 'py1s'
get_shuffles = get_shuffle_naive, get_shuffle_py1e, get_shuffle_py1br, get_shuffle_py2s, get_shuffle_py1s

if args.algos:
algos = args.algos.split(',') if args.algos else []
Expand Down
3 changes: 2 additions & 1 deletion scripts/shuffle/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@

algo_colors = {
'naive': 'black',
'py1e': 'green',
'py1br': 'orange',
'py2s': 'purple',
'py1s': 'red',
'py1b': 'orange',
}


Expand Down
20 changes: 11 additions & 9 deletions simulation/interfaces/sim_script.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@
import os.path
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

import humanize
from core.create_index import create_stream_index
from core.main import simulate
Expand All @@ -18,6 +16,8 @@

from streaming.base import Stream

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

# Input Parameters

# dataset
Expand All @@ -27,7 +27,8 @@
avg_zip_shard_size = 15000000 # average compressed shard size (bytes)

# training
max_duration = '1000ba' # max duration of training (batches: "ba", epochs: "ep")
# max duration of training (batches: "ba", epochs: "ep")
max_duration = '1000ba'
epoch_size = None # epoch size (samples)
device_batch_size = 16 # device batch size (samples)

Expand All @@ -37,7 +38,7 @@
predownload = 32 # number of samples to predownload per worker (samples)
cache_limit = None # cache limit (bytes)
shuffle = True # whether to shuffle dataset
shuffle_algo = 'py1b' # shuffling algorithm
shuffle_algo = 'py1e' # shuffling algorithm
shuffle_block_size = 16000000 # shuffling block size (samples)
seed = 17 # random seed

Expand Down Expand Up @@ -87,7 +88,8 @@
# Display simulation stats
total_batches = len(step_times)
all_throughput_drops, warmup_time, warmup_step, post_warmup_throughput_drops = \
get_simulation_stats(step_times, time_per_sample, global_batch_size//(physical_nodes*devices))
get_simulation_stats(step_times, time_per_sample,
global_batch_size//(physical_nodes*devices))
print('\nSimulation Stats:')
print(f'Minimum cache limit needed: {humanize.naturalsize(min_cache_limit)}')
if cache_limit is not None and cache_limit < min_cache_limit:
Expand All @@ -102,12 +104,12 @@
# display warning if post-warmup throughput drops are more than 10% of the run.
print(
'⚠️ This configuration experiences some downloading-related slowdowns even after warmup.')
print('{0} steps, or {1:.1f}% of all steps, waited for shard downloads.'\
.format(all_throughput_drops, 100 * all_throughput_drops / (total_batches)))
print('{0} steps, or {1:.1f}% of all steps, waited for shard downloads.'.format(
all_throughput_drops, 100 * all_throughput_drops / (total_batches)))
if warmup_step != total_batches:
# only display post-warmup throughput drop info if we actually ended the warmup period (i.e. we hit peak throughput at some point)
print('There were {} steps that waited for shard downloads after the warmup period.'\
.format(post_warmup_throughput_drops))
print('There were {} steps that waited for shard downloads after the warmup period.'.format(
post_warmup_throughput_drops))
print('Estimated time to first batch: {0:.2f} s'.format(startup_time))
print('Estimated warmup time: {0:.2f} s'.format(warmup_time))

Expand Down
21 changes: 11 additions & 10 deletions simulation/interfaces/sim_ui.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,9 @@

"""Simulator web UI using streamlit."""

import math
import os.path
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

import math
from concurrent.futures import ProcessPoolExecutor
from io import StringIO
from typing import Union
Expand All @@ -30,6 +27,8 @@

from streaming.base.util import bytes_to_int, number_abbrev_to_int

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

# set up page
st.set_page_config(layout='wide')
col1, space, col2 = st.columns((10, 1, 6))
Expand All @@ -45,7 +44,7 @@
col2.text('')
shuffle_quality_plot = col2.empty()
throughput_window = 10
shuffle_quality_algos = ['naive', 'py1b', 'py1br', 'py1e', 'py1s', 'py2s', 'none']
shuffle_quality_algos = ['naive', 'py1e', 'py1br', 'py1s', 'py2s', 'none']


def submit_jobs(shuffle_quality: bool, dataset: SimulationDataset, time_per_sample: float,
Expand Down Expand Up @@ -94,7 +93,7 @@ def submit_jobs(shuffle_quality: bool, dataset: SimulationDataset, time_per_samp
device_batch_size = input_params['device_batch_size']
shuffle_block_size = number_abbrev_to_int(input_params['shuffle_block_size']) \
if input_params['shuffle_block_size'] is not None \
else dataset.get_shuffle_block_size()
else dataset.get_shuffle_block_size()
samples_per_shard = dataset.get_avg_samples_per_shard()
epoch_size = dataset.get_epoch_size()
if epoch_size > 100_000_000:
Expand Down Expand Up @@ -162,7 +161,7 @@ def submit_jobs(shuffle_quality: bool, dataset: SimulationDataset, time_per_samp

# If applicable, check if the shuffle quality tasks are finished, and graph.
if shuffle_quality and all(f.done() for f in futures) \
and not shuffle_quality_graphed:
and not shuffle_quality_graphed:
display_shuffle_quality_graph(futures, shuffle_quality_plot)
shuffle_quality_graphed = True

Expand Down Expand Up @@ -246,7 +245,7 @@ def get_input_params_initial(physical_nodes: int, devices: int, workers: int,
string_yaml = StringIO(uploaded_yaml.getvalue().decode('utf-8')).read()
dict_yaml = yaml.safe_load(string_yaml)
total_devices, workers, max_duration, global_batch_size, train_dataset = \
ingest_yaml(yaml_dict=dict_yaml)
ingest_yaml(yaml_dict=dict_yaml)
# Check which parameters we still need to ask for.
col1.write('The parameters below were not found in your yaml file. Enter them here:')
physical_nodes = col1.number_input(
Expand Down Expand Up @@ -337,7 +336,8 @@ def get_input_params_initial(physical_nodes: int, devices: int, workers: int,
# Get the rest of the needed params from the new inputs
physical_nodes = input_params['physical_nodes']
devices = input_params['devices']
global_batch_size = input_params['device_batch_size'] * devices * physical_nodes
global_batch_size = input_params['device_batch_size'] * \
devices * physical_nodes
workers = input_params['workers']
max_duration = input_params['max_duration']
time_per_sample = input_params['time_per_sample']
Expand Down Expand Up @@ -398,7 +398,8 @@ def get_input_params_initial(physical_nodes: int, devices: int, workers: int,
# Get the rest of the needed params from the new inputs
physical_nodes = input_params['physical_nodes']
devices = input_params['devices']
global_batch_size = input_params['device_batch_size'] * devices * physical_nodes
global_batch_size = input_params['device_batch_size'] * \
devices * physical_nodes
workers = input_params['workers']
max_duration = input_params['max_duration']
time_per_sample = input_params['time_per_sample']
Expand Down
13 changes: 6 additions & 7 deletions simulation/interfaces/widgets.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@

import os.path
import sys

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))

from concurrent.futures import Future
from typing import Optional

Expand All @@ -22,6 +19,8 @@

from streaming.base.util import bytes_to_int

sys.path.append(os.path.join(os.path.dirname(__file__), '..'))


def get_line_chart(data: pd.DataFrame,
throughput_window: int,
Expand Down Expand Up @@ -293,13 +292,13 @@ def param_inputs(component: DeltaGenerator, input_params: dict, defaults: dict =
shuffle = col_r.checkbox(label='shuffle',
value=True if 'shuffle' not in defaults else defaults['shuffle'],
help='whether or not to shuffle the samples for this run.')
shuffle_algo='py1e' if len(defaults) == 0 or 'shuffle_algo' not in defaults \
shuffle_algo = 'py1e' if len(defaults) == 0 or 'shuffle_algo' not in defaults \
else defaults['shuffle_algo']
shuffle_block_size=None if len(defaults) == 0 or 'shuffle_block_size' not in defaults \
shuffle_block_size = None if len(defaults) == 0 or 'shuffle_block_size' not in defaults \
else defaults['shuffle_block_size']
seed = 42 if len(defaults) == 0 or 'seed' not in defaults else defaults['seed']
if shuffle:
algos = ['py1e', 'py1br', 'py1b', 'py1s', 'py2s', 'naive']
algos = ['py1e', 'py1br', 'py1s', 'py2s', 'naive']
default_index = 0
if 'shuffle_algo' in defaults:
default_index = algos.index(defaults['shuffle_algo'])
Expand All @@ -313,7 +312,7 @@ def param_inputs(component: DeltaGenerator, input_params: dict, defaults: dict =
'shuffle block size (samples)',
value='None'
if 'shuffle_block_size' not in defaults else defaults['shuffle_block_size'],
help='shuffle block size for this run. used in the `py1b`, `py1br`, and `py1e` \
help='shuffle block size for this run. used in the `py1e` and `py1br` \
shuffling algorithms, samples in blocks of `shuffle_block_size` are randomly \
shuffled inside each bucket of shards (aka canonical node).')
seed = col_r.number_input('shuffle seed',
Expand Down
2 changes: 1 addition & 1 deletion streaming/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@

"""The Streaming Version."""

__version__ = '0.10.0.dev0'
__version__ = '0.11.0.dev0'
8 changes: 2 additions & 6 deletions streaming/base/converters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,6 @@
"""Utility function for converting spark dataframe to MDS dataset."""

from streaming.base.converters.dataframe_to_mds import (SPARK_TO_MDS, dataframe_to_mds,
dataframeToMDS, infer_dataframe_schema,
is_json_compatible)
infer_dataframe_schema, is_json_compatible)

__all__ = [
'dataframeToMDS', 'dataframe_to_mds', 'SPARK_TO_MDS', 'infer_dataframe_schema',
'is_json_compatible'
]
__all__ = ['dataframe_to_mds', 'SPARK_TO_MDS', 'infer_dataframe_schema', 'is_json_compatible']
Loading

0 comments on commit 93af28a

Please sign in to comment.