From dae9d9390002feaf72e280d86f3ce80b092f3d11 Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Wed, 31 Jan 2024 22:11:27 -0500 Subject: [PATCH 01/10] implemented #148 --- pescador/mux.py | 41 +++++++++++++++++++++++++++++++++++------ tests/test_mux.py | 13 ++++++++----- 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/pescador/mux.py b/pescador/mux.py index ec6fd0a..70deb23 100644 --- a/pescador/mux.py +++ b/pescador/mux.py @@ -289,6 +289,7 @@ def __init__( weights=None, mode="with_replacement", prune_empty_streams=True, + dist="binomial", random_state=None, ): """Given an array (pool) of streamer types, do the following: @@ -335,7 +336,12 @@ def __init__( Run every selected stream once to exhaustion. prune_empty_streams : bool - Disable streamers that produce no data. See `BaseMux` + Disable streamers that produce no data. + + dist : ["constant", "binomial", "poisson"] + Distribution governing the (maximum) number of samples taken + from an active streamer. + In each case, the expected number of samples will be `rate`. random_state : None, int, or np.random.RandomState See `BaseMux` @@ -344,6 +350,7 @@ def __init__( self.n_active = n_active self.rate = rate self.prune_empty_streams = prune_empty_streams + self.dist = dist super().__init__(streamers, random_state=random_state) @@ -354,6 +361,10 @@ def __init__( raise PescadorError( f"{self.mode} is not a valid mode for StochasticMux" ) + if self.dist not in ["constant", "binomial", "poisson"]: + raise PescadorError( + f"{self.dist} is not a valid distribution" + ) self.weights = weights if self.weights is None: @@ -436,7 +447,7 @@ def _on_stream_exhausted(self, idx): else: self.distribution_[self.stream_idxs_[idx]] = 1.0 - def _activate_stream(self, idx): + def _activate_stream(self, idx, old_idx): """Randomly select and create a stream. StochasticMux adds mode handling to _activate_stream, making it so that @@ -447,16 +458,34 @@ def _activate_stream(self, idx): Parameters ---------- idx : int, [0:n_streams - 1] - The stream index to replace + The stream index to activate + + old_idx : int + The index of the stream being replaced in the active set. + This is needed for computing binomial probabilities. """ + weight = self.weights[idx] + # Get the number of samples for this streamer. n_samples_to_stream = None if self.rate is not None: - n_samples_to_stream = 1 + self.rng.poisson(lam=self.rate) + if self.dist == "constant": + n_samples_to_stream = self.rate + elif self.dist == "poisson": + n_samples_to_stream = 1 + self.rng.poisson(lam=self.rate - 1) + elif self.dist == "binomial": + # Bin((rate-1) / (1-p), 1-p) where p = prob of selecting the new + # streamer from the active set + p = weight / (np.sum(self.stream_weights_) - self.stream_weights_[old_idx] + weight) + if p > 0.9999: + # If we effectively have only one streamer, use the poisson limit + # theorem + n_samples_to_stream = 1 + self.rng.poisson(lam=self.rate - 1) + else: + n_samples_to_stream = 1 + self.rng.binomial((self.rate-1) / (1-p), 1-p) # instantiate a new streamer streamer = self.streamers[idx].iterate(max_iter=n_samples_to_stream) - weight = self.weights[idx] # If we're sampling without replacement, zero this one out # This effectively disables this stream as soon as it is chosen, @@ -484,7 +513,7 @@ def _new_stream(self, idx): # Activate the Streamer, and get the weights self.streams_[idx], self.stream_weights_[idx] = self._activate_stream( - self.stream_idxs_[idx] + self.stream_idxs_[idx], idx ) # Reset the sample count to zero diff --git a/tests/test_mux.py b/tests/test_mux.py index 2ab0d39..f94b0ad 100644 --- a/tests/test_mux.py +++ b/tests/test_mux.py @@ -544,7 +544,10 @@ def test_mux_inf_loop(self, mux_class): assert len(list(mux(max_iter=100))) == 0 - def test_mux_stacked_uniform_convergence(self, mux_class): + @pytest.mark.parametrize( + 'dist', ['constant', 'binomial', 'poisson', + pytest.param('gaussian', marks=pytest.mark.xfail(raises=pescador.PescadorError))]) + def test_mux_stacked_uniform_convergence(self, mux_class, dist): """This test is designed to check that bootstrapped streams of data (Streamer subsampling, rate limiting) cascaded through multiple multiplexors converges in expectation to a flat, uniform sample of the @@ -553,19 +556,19 @@ def test_mux_stacked_uniform_convergence(self, mux_class): ab = pescador.Streamer(_choice, 'ab') cd = pescador.Streamer(_choice, 'cd') ef = pescador.Streamer(_choice, 'ef') - mux1 = mux_class([ab, cd, ef], 2, rate=2, random_state=1357) + mux1 = mux_class([ab, cd, ef], 2, rate=4, random_state=1357, dist=dist) gh = pescador.Streamer(_choice, 'gh') ij = pescador.Streamer(_choice, 'ij') kl = pescador.Streamer(_choice, 'kl') - mux2 = mux_class([gh, ij, kl], 2, rate=2, random_state=2468) + mux2 = mux_class([gh, ij, kl], 2, rate=4, random_state=2468, dist=dist) stacked_mux = mux_class([mux1, mux2], 2, rate=None, random_state=12345) - max_iter = 1000 chars = 'abcdefghijkl' + max_iter = len(chars) * 500 samples = list(stacked_mux.iterate(max_iter=max_iter)) counter = collections.Counter(samples) assert set(chars) == set(counter.keys()) @@ -574,7 +577,7 @@ def test_mux_stacked_uniform_convergence(self, mux_class): # Check that the pvalue for the chi^2 test is at least 0.95 test = scipy.stats.chisquare(counts) - assert test.pvalue >= 0.95 + assert test.pvalue >= 0.5, counts class TestShuffledMux: From d0b0e3bc19bbe64d6066c9d71a56c2d219e651d0 Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Wed, 31 Jan 2024 22:15:04 -0500 Subject: [PATCH 02/10] linting --- pescador/mux.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pescador/mux.py b/pescador/mux.py index 70deb23..637f347 100644 --- a/pescador/mux.py +++ b/pescador/mux.py @@ -459,7 +459,6 @@ def _activate_stream(self, idx, old_idx): ---------- idx : int, [0:n_streams - 1] The stream index to activate - old_idx : int The index of the stream being replaced in the active set. This is needed for computing binomial probabilities. From 4cfb069739ffba4727263e66a7f3eff9143b5364 Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Thu, 1 Feb 2024 10:47:23 -0500 Subject: [PATCH 03/10] warm-start the active set distribution for binomial mode consistency --- pescador/mux.py | 8 +++++++- tests/test_mux.py | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/pescador/mux.py b/pescador/mux.py index 637f347..9da542a 100644 --- a/pescador/mux.py +++ b/pescador/mux.py @@ -395,7 +395,13 @@ def _activate(self): # Weights of the active streams. # Once a stream is exhausted, it is set to 0 - self.stream_weights_ = np.zeros(self.n_active) + #self.stream_weights_ = np.zeros(self.n_active) + self.stream_weights_ = self.rng.choice(self.weights, + size=self.n_active, + p=(self.weights / np.sum(self.weights)), + replace=(self.mode == "with_replacement")) + + # How many samples have been draw from each (active) stream. self.stream_counts_ = np.zeros(self.n_active, dtype=int) # Array of pointers into `self.streamers` diff --git a/tests/test_mux.py b/tests/test_mux.py index f94b0ad..ce3146c 100644 --- a/tests/test_mux.py +++ b/tests/test_mux.py @@ -568,7 +568,7 @@ def test_mux_stacked_uniform_convergence(self, mux_class, dist): random_state=12345) chars = 'abcdefghijkl' - max_iter = len(chars) * 500 + max_iter = len(chars) * 100 samples = list(stacked_mux.iterate(max_iter=max_iter)) counter = collections.Counter(samples) assert set(chars) == set(counter.keys()) From a0fc5458a4fd98acd7d0bcdae5f50803d010fc9e Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Thu, 1 Feb 2024 16:35:14 -0500 Subject: [PATCH 04/10] smoothing out edge cases in new distribution modes --- pescador/mux.py | 30 ++++++++++++++++++++++-------- tests/test_mux.py | 34 +++++++++++++++++++++++++++------- 2 files changed, 49 insertions(+), 15 deletions(-) diff --git a/pescador/mux.py b/pescador/mux.py index 9da542a..0e17137 100644 --- a/pescador/mux.py +++ b/pescador/mux.py @@ -308,6 +308,7 @@ def __init__( n_active : int > 0 The number of streams to keep active at any time. + rate : float > 0 or None Rate parameter for the distribution governing sample counts for individual streams. @@ -366,6 +367,11 @@ def __init__( f"{self.dist} is not a valid distribution" ) + if self.mode != "with_replacement" and self.n_active > self.n_streams: + raise PescadorError( + f"mode={mode} requires that n_active={n_active} be at most the number of streamers={self.n_streams}" + ) + self.weights = weights if self.weights is None: self.weights = 1.0 / self.n_streams * np.ones(self.n_streams) @@ -395,12 +401,17 @@ def _activate(self): # Weights of the active streams. # Once a stream is exhausted, it is set to 0 - #self.stream_weights_ = np.zeros(self.n_active) - self.stream_weights_ = self.rng.choice(self.weights, - size=self.n_active, - p=(self.weights / np.sum(self.weights)), - replace=(self.mode == "with_replacement")) - + try: + self.stream_weights_ = self.rng.choice(self.weights, + size=self.n_active, + p=(self.weights / np.sum(self.weights)), + replace=(self.mode == "with_replacement")) + except ValueError: + # This situation arises if the only remaining weights are all zero + # Initializing the stream weights to all zeros here will inflate + # the variance of rate parameters for activated streamers in binomial + # mode, but is otherwise harmless. + self.stream_weights_ = np.zeros(self.n_active) # How many samples have been draw from each (active) stream. self.stream_counts_ = np.zeros(self.n_active, dtype=int) @@ -481,10 +492,13 @@ def _activate_stream(self, idx, old_idx): elif self.dist == "binomial": # Bin((rate-1) / (1-p), 1-p) where p = prob of selecting the new # streamer from the active set - p = weight / (np.sum(self.stream_weights_) - self.stream_weights_[old_idx] + weight) - if p > 0.9999: + with np.errstate(invalid="ignore"): + # We'll suppress 0/0 here and catch it below as a special case + p = weight / (np.sum(self.stream_weights_) - self.stream_weights_[old_idx] + weight) + if p > 0.9999 or np.isnan(p): # If we effectively have only one streamer, use the poisson limit # theorem + # nan case occurs when all stream weights are 0 n_samples_to_stream = 1 + self.rng.poisson(lam=self.rate - 1) else: n_samples_to_stream = 1 + self.rng.binomial((self.rate-1) / (1-p), 1-p) diff --git a/tests/test_mux.py b/tests/test_mux.py index ce3146c..16d7bfa 100644 --- a/tests/test_mux.py +++ b/tests/test_mux.py @@ -374,6 +374,18 @@ def __empty(): for b1, b2 in zip(ref, estimate): T._eq_batch(b1, b2) + @pytest.mark.parametrize('mux_class', [ + functools.partial(pescador.mux.StochasticMux, + mode="exhaustive")], + ids=["StochasticMux-exhaustive"]) + @pytest.mark.xfail(raises=pescador.PescadorError) + def test_mux_too_many_active(self, mux_class): + # This fails because in single-active mode, we won't have + # enough streamers to create the active set + streamers = [pescador.Streamer(T.finite_generator, 10) + for _ in range(3)] + mux = mux_class(streamers, 4, rate=1) + @pytest.mark.parametrize('mux_class', [ functools.partial(pescador.mux.StochasticMux, mode="exhaustive")], ids=["StochasticMux"]) @@ -438,26 +450,34 @@ def test_sampled_mux_of_muxes(self, mux_class): functools.partial(pescador.mux.StochasticMux, mode="single_active")], ids=["StochasticMux"]) class TestStochasticMux_SingleActive: - @pytest.mark.parametrize('n_streams', [1, 2, 4]) + @pytest.mark.parametrize('n_streams', [4, 6, 8]) @pytest.mark.parametrize('n_samples', [512]) - @pytest.mark.parametrize('k', [1, 2, 4]) + @pytest.mark.parametrize('n_active', [1, 2, 4]) @pytest.mark.parametrize('rate', [1.0, 2.0, 4.0]) - def test_mux_single_active(self, mux_class, n_streams, n_samples, k, rate): + def test_mux_single_active(self, mux_class, n_streams, n_samples, n_active, rate): streamers = [pescador.Streamer(T.finite_generator, 10) for _ in range(n_streams)] - mux = mux_class(streamers, k, rate=rate) + mux = mux_class(streamers, n_active, rate=rate) estimate = list(mux.iterate(n_samples)) # Make sure we get the right number of samples # This is highly improbable when revive=False assert len(estimate) == n_samples + @pytest.mark.xfail(raises=pescador.PescadorError) + def test_mux_too_many_active(self, mux_class): + # This fails because in single-active mode, we won't have + # enough streamers to create the active set + streamers = [pescador.Streamer(T.finite_generator, 10) + for _ in range(3)] + mux = mux_class(streamers, 4, rate=1) + def test_mux_of_muxes_itered(self, mux_class): # Check on Issue #79 abc = pescador.Streamer('abc') xyz = pescador.Streamer('xyz') - mux1 = mux_class([abc, xyz], 10, rate=None, + mux1 = mux_class([abc, xyz], 2, rate=None, prune_empty_streams=False, random_state=135) samples1 = mux1.iterate(max_iter=1000) count1 = collections.Counter(samples1) @@ -465,7 +485,7 @@ def test_mux_of_muxes_itered(self, mux_class): n123 = pescador.Streamer('123') n456 = pescador.Streamer('456') - mux2 = mux_class([n123, n456], 10, rate=None, + mux2 = mux_class([n123, n456], 2, rate=None, prune_empty_streams=False, random_state=246) samples2 = mux2.iterate(max_iter=1000) @@ -473,7 +493,7 @@ def test_mux_of_muxes_itered(self, mux_class): assert set('123456') == set(count2.keys()) # Note that (random_state=987, n_active=2) fails. - mux3 = mux_class([mux1, mux2], 10, rate=None, + mux3 = mux_class([mux1, mux2], 2, rate=None, prune_empty_streams=False, random_state=987) samples3 = mux3.iterate(max_iter=1000) From 2eb45de3202e6a466b0e23bb4c5f0ff9252704cc Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Fri, 2 Feb 2024 10:50:55 -0500 Subject: [PATCH 05/10] linting --- pescador/mux.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pescador/mux.py b/pescador/mux.py index 0e17137..d16cede 100644 --- a/pescador/mux.py +++ b/pescador/mux.py @@ -308,7 +308,6 @@ def __init__( n_active : int > 0 The number of streams to keep active at any time. - rate : float > 0 or None Rate parameter for the distribution governing sample counts for individual streams. From 839ee2b16454b34a9d5f677c6473b27cbd0e830f Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Fri, 8 Mar 2024 10:12:52 -0500 Subject: [PATCH 06/10] updated codecov action --- .github/workflows/ci.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 80e20bb..0f51b66 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -103,7 +103,7 @@ jobs: run: pytest - name: Upload coverage to Codecov - uses: codecov/codecov-action@v3 + uses: codecov/codecov-action@v4 with: token: ${{ secrets.CODECOV_TOKEN }} files: ./coverage.xml From a1091ef1f581a1f1ad496b077e641b2c5fca241c Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Fri, 8 Mar 2024 10:20:19 -0500 Subject: [PATCH 07/10] trying without directory info --- .github/workflows/ci.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 0f51b66..849aab7 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -107,7 +107,6 @@ jobs: with: token: ${{ secrets.CODECOV_TOKEN }} files: ./coverage.xml - directory: ./coverage/reports/ flags: unittests env_vars: OS,PYTHON name: codecov-umbrella From 1d79e1e937bead9a55a59fa4b74e42c9bd2c81d2 Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Fri, 8 Mar 2024 10:56:22 -0500 Subject: [PATCH 08/10] updating test comments --- tests/test_mux.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_mux.py b/tests/test_mux.py index 16d7bfa..60dd640 100644 --- a/tests/test_mux.py +++ b/tests/test_mux.py @@ -595,7 +595,7 @@ def test_mux_stacked_uniform_convergence(self, mux_class, dist): counts = np.asarray(list(counter.values())) - # Check that the pvalue for the chi^2 test is at least 0.95 + # Check that the pvalue for the chi^2 test is at least 0.5 test = scipy.stats.chisquare(counts) assert test.pvalue >= 0.5, counts From 5c7a4537054af9aed732b81fdfdae4af1d92090d Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Mon, 11 Mar 2024 11:35:21 -0400 Subject: [PATCH 09/10] adding mux analysis doc stub --- docs/index.rst | 8 ++ docs/muxanalysis.rst | 195 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 203 insertions(+) create mode 100644 docs/muxanalysis.rst diff --git a/docs/index.rst b/docs/index.rst index 247402e..c6badbc 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -96,6 +96,14 @@ Advanced examples auto_examples/index +*********************** +Stochastic mux analysis +*********************** +.. toctree:: + :maxdepth: 2 + + muxanalysis + ************* API Reference ************* diff --git a/docs/muxanalysis.rst b/docs/muxanalysis.rst new file mode 100644 index 0000000..4cd4cb9 --- /dev/null +++ b/docs/muxanalysis.rst @@ -0,0 +1,195 @@ +.. _muxanalysis: + +Analysis of Stochastic Mux +========================== + +:ref:`mux` objects (*mux* for short, *muxen* for plural) allow multiple :ref:`Streamer` objects to +be combined into a single stream by selectively sampling from each constituent stream. +The different kinds of mux objects provide different behaviors, but among them, and among +them, ``StochasticMux`` is the most complex. +This section provides an in-depth analysis of ``StochasticMux``'s behavior. + + + +Stream activation and replacement +--------------------------------- + +``StochasticMux`` differs from other muxen (``ShuffledMux``, ``RoundRobinMux``, etc.) by +maintaining an **active set** of streamers from the full collection it is multiplexing. +At any given time, samples are drawn only from the active set, while the remaining streamers are +**inactive**. +Each active streamer is limited to produce a (possibly random) number of samples, after which, it is removed from +the active set and replaced by a new streamer selected at random; hence the name **StochasticMux**. + +A key quantity to understand when using ``StochasticMux`` is the streamer replacement rate: how +often should we expect streamers to be replaced from the active set, as a function of samples +generated by the mux? +This quantity is important for a couple of reasons: + + * If we care about the distribution of samples produced by ``StochasticMux`` being a good + approximation of what you would get if all streamers were active simultaneously (i.e., + ``ShuffledMux`` behavior), then the streamer replacement rate should be small. + * If we have large startup costs involved with activating a streamer (e.g., loading data + from disk), then streamer replacement should be infrequent to ensure high throughput. + What's more, replacement events should be spread out among the active set, to avoid having several replacement events in a short period of time. + +In the following sections, we'll analyze replacement rates for the different choices of rate +distributions (`constant`, `poisson`, and `binomial`). +We'll focus the analysis on a single (active) streamer at a time. +The question we'll analyze is specifically: how many samples :math:`N` must we generate (in +expectation) before a specific streamer is deactivated and replaced? +Understanding the distribution of `N` (its mean and variance) will help us understand how often +we should expect to see streamer replacement events. + + +Notation +-------- + +Let :math:`A` denote the size of the active set, let :math:`r` denote the number of samples +generated by a particular streamer, and let :math:`p` denote the probability of selecting the +active streamer in question. +We'll make the simplifying assumption that the ``weights`` attached to all streamers are +uniform, i.e., :math:`p = 1/A`. + + +Constant distribution +--------------------- + +When using the ``constant`` distribution, the sample limit :math:`r` is fixed in advance. +Our question about the number of samples generated by StochasticMux can then be rephrased +slightly: +how many samples :math:`K` must we draw from *all other active streamers* before drawing the +:math:`r`\ th sample from the streamer under analysis? + +This number :math:`K` is a random variable, modeled by the `negative binomial distribution `_: + +.. math:: + + \text{Pr}[K = k] = {k + r - 1 \choose k} {(1-p)^k p^r} + + +It has expected value + +.. math:: + + \text{E}[K] = r \cdot \frac{1-p}{p}, + +and variance + +.. math:: + + \text{Var}[K] = r \cdot \frac{1-p}{p^2}. + + +The total number of samples produced by the mux before the streamer is replaced is now a random +variable :math:`N = K + r`. +We can use linearity of expectation to compute its expected value as + +.. math:: + + \text{E}[N] = \text{E}[K] + r = r \cdot\frac{1-p}{p} + r = \frac{r}{p}. + + +Since :math:`N` and :math:`K` differ only by a constant (:math:`r`), they have the same +variance: + +.. math:: + + \text{Var}[N] = \text{Var}[K]. + + +If we apply the simplifying assumption that streamers are selected uniformly at random (:math:`p += 1/A`), then we get the following: + + * :math:`\text{E}[N] = r \cdot A`, and + * :math:`\text{Var}[N] = r \cdot A \cdot (A-1)`. + +In plain language, this says that the streamer replacement rate scales like the product of the size of the active set and the number of samples per streamer. +Making either of these values large implies that we should expect to wait longer to replace an active streamer. +However, the variance of replacement event times is approximately **quadratic** in the size of the active set. +This means that making the active set larger will increase the dispersion of replacement events away from the expected value. + + +Poisson distribution +-------------------- + +In pescador version 2 and earlier, the sample limit :math:`r` was not a constant value, but a +random variable :math:`R` drawn from a Poisson distribution with rate parameter :math:`\lambda`. +The analysis above can mostly be carried over to handle this case, though it does not lead to a +closed form expression for :math:`\text{E}[N]` or :math:`\text{Var}[N]` because we must now +marginalize over the variable :math:`R`: + +.. math:: + + \text{Pr}[K=k] &= \sum_{r=0}^{\infty} \text{Pr}[K=k, R = r]\\ + &= \sum_{r=0}^{\infty} \text{Pr}[K=k ~|~ R = r] \times \text{Pr}[R=r]\\ + &= \sum_{r=0}^{\infty} {k + r - 1 \choose k} {(1-p)^k p^r} \times \frac{\lambda^r e^{-\lambda}}{r!} + + +While this distribution is still supported, it has been replaced as the default by a binomial +distribution mode which is more amenable to analysis. + +Binomial distribution +--------------------- + +In the binomial distribution mode, :math:`R` is a random variable governed by a binomial +distribution with parameters :math:`(m, q)`: + +.. math:: + + \text{Pr}[R=r] = {m \choose r} q^r (1-q)^{m-r}. + +(We will come back to determining values for :math:`(m, q)` later.) + +This distribution can be integrated with the negative binomial distribution above to yield a +straightforward computation of :math:`\text{Pr}[N]`. + +.. math:: + + \text{Pr}[N=n] &= \sum_{r=0}^{\infty} \text{Pr}[K=n-r ~|~ R= r] \times \text{Pr}[R=r]\\ + &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^{n-r} p^r \cdot {m \choose r} q^r {(1-q)}^{m-r}. + +If we set :math:`q = 1-p`, this simplifies as follows: + +.. math:: + + \text{Pr}[N=n] &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^{n-r} p^r \cdot {m \choose r} {(1-p)}^r p^{m-r}\\ + &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^n p^m {m \choose r}\\ + &= {\left(1-p\right)}^n p^m {n + m - 1\choose n}. + +This distribution again has the form of a negative binomial with parameters :math:`(m, 1-p)`. +If we further set + +.. math:: + + m = \frac{\lambda}{1-p} + +for an expected rate parameter :math:`\lambda > 0` (as in the Poisson case above), then the +distribution :math:`\text{Pr}[N=n]` is + +.. math:: + + N \sim \text{NB}\left(\frac{\lambda}{1-p}, 1-p\right), + +where NB denotes the probability mass function of the negative binomial distribution. +This yields: + + - :math:`\text{E}[R] = \lambda`, + - :math:`\text{E}[N] = \lambda / p`, and + - :math:`\text{Var}[N] = \lambda \frac{1-p}{p^2}`. + +These match the analysis of the constant-mode case above, except that the number of samples per +streamer is now a random variable with expectation :math:`\lambda`. +Again, in the special case where :math:`p=1/A`, we recover + + - :math:`\text{E}[N] = \lambda A`, and + - :math:`\text{Var}[N] = \lambda A (A-1)`. + + +Limiting case :math:`p=1` +------------------------- + + +Discussion and recommendations +------------------------------ + From 1f480e554d34aa52790ef6b6bbb236b26f341f3e Mon Sep 17 00:00:00 2001 From: Brian McFee Date: Mon, 11 Mar 2024 15:46:45 -0400 Subject: [PATCH 10/10] updated documentation for stochastic mux --- docs/muxanalysis.rst | 105 +++++++++++++++++++++++++++++++++++-------- pescador/mux.py | 4 +- 2 files changed, 89 insertions(+), 20 deletions(-) diff --git a/docs/muxanalysis.rst b/docs/muxanalysis.rst index 4cd4cb9..26e0a03 100644 --- a/docs/muxanalysis.rst +++ b/docs/muxanalysis.rst @@ -72,13 +72,13 @@ It has expected value .. math:: - \text{E}[K] = r \cdot \frac{1-p}{p}, + \text{E}[K] = r \times \frac{1-p}{p}, and variance .. math:: - \text{Var}[K] = r \cdot \frac{1-p}{p^2}. + \text{Var}[K] = r \times \frac{1-p}{p^2}. The total number of samples produced by the mux before the streamer is replaced is now a random @@ -87,7 +87,7 @@ We can use linearity of expectation to compute its expected value as .. math:: - \text{E}[N] = \text{E}[K] + r = r \cdot\frac{1-p}{p} + r = \frac{r}{p}. + \text{E}[N] = \text{E}[K] + r = r \times\frac{1-p}{p} + r = \frac{r}{p}. Since :math:`N` and :math:`K` differ only by a constant (:math:`r`), they have the same @@ -101,8 +101,8 @@ variance: If we apply the simplifying assumption that streamers are selected uniformly at random (:math:`p = 1/A`), then we get the following: - * :math:`\text{E}[N] = r \cdot A`, and - * :math:`\text{Var}[N] = r \cdot A \cdot (A-1)`. + * :math:`\text{E}[N] = r \times A`, and + * :math:`\text{Var}[N] = r \times A \times (A-1)`. In plain language, this says that the streamer replacement rate scales like the product of the size of the active set and the number of samples per streamer. Making either of these values large implies that we should expect to wait longer to replace an active streamer. @@ -115,20 +115,31 @@ Poisson distribution In pescador version 2 and earlier, the sample limit :math:`r` was not a constant value, but a random variable :math:`R` drawn from a Poisson distribution with rate parameter :math:`\lambda`. -The analysis above can mostly be carried over to handle this case, though it does not lead to a -closed form expression for :math:`\text{E}[N]` or :math:`\text{Var}[N]` because we must now -marginalize over the variable :math:`R`: +In this case, the mean and variance of :math:`R` are simply :math:`\text{E}[R] = +\text{Var}[R] = \lambda`. + +However, this does not lead to a closed form expression for :math:`\text{E}[N]` or :math:`\text{Var}[N]` because we must now marginalize over :math:`R`: .. math:: - \text{Pr}[K=k] &= \sum_{r=0}^{\infty} \text{Pr}[K=k, R = r]\\ - &= \sum_{r=0}^{\infty} \text{Pr}[K=k ~|~ R = r] \times \text{Pr}[R=r]\\ - &= \sum_{r=0}^{\infty} {k + r - 1 \choose k} {(1-p)^k p^r} \times \frac{\lambda^r e^{-\lambda}}{r!} + \text{Pr}[N=n] &= \sum_{r=0}^{\infty} \text{Pr}[K=n-r, R = r]\\ + &= \sum_{r=0}^{\infty} \text{Pr}[K=n-r ~|~ R = r] \times \text{Pr}[R=r]\\ + &= \sum_{r=0}^{\infty} {n - 1 \choose n-r} {(1-p)^{n-r} p^r} \times \frac{\lambda^r e^{-\lambda}}{r!} While this distribution is still supported, it has been replaced as the default by a binomial distribution mode which is more amenable to analysis. + +.. note:: + In pescador ≥ 3.0, poisson mode is actually implemented as :math:`R \sim 1 + + \text{Poisson}(\lambda - 1)`. This maintains the expected value of + :math:`\lambda`, with slightly reduced variance (:math:`\lambda - 1`), + but it ensures that at least one sample is produced from active streamers before deactivation. + + This logic is also applied to the binomial mode described below, but omitted + from the analysis here for simplicity. + Binomial distribution --------------------- @@ -147,13 +158,13 @@ straightforward computation of :math:`\text{Pr}[N]`. .. math:: \text{Pr}[N=n] &= \sum_{r=0}^{\infty} \text{Pr}[K=n-r ~|~ R= r] \times \text{Pr}[R=r]\\ - &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^{n-r} p^r \cdot {m \choose r} q^r {(1-q)}^{m-r}. + &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^{n-r} p^r \times {m \choose r} q^r {(1-q)}^{m-r}. If we set :math:`q = 1-p`, this simplifies as follows: .. math:: - \text{Pr}[N=n] &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^{n-r} p^r \cdot {m \choose r} {(1-p)}^r p^{m-r}\\ + \text{Pr}[N=n] &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^{n-r} p^r \times {m \choose r} {(1-p)}^r p^{m-r}\\ &= \sum_{r=0}^{\infty} {n-1 \choose n-r} {\left(1-p\right)}^n p^m {m \choose r}\\ &= {\left(1-p\right)}^n p^m {n + m - 1\choose n}. @@ -174,7 +185,9 @@ distribution :math:`\text{Pr}[N=n]` is where NB denotes the probability mass function of the negative binomial distribution. This yields: - - :math:`\text{E}[R] = \lambda`, + - :math:`\text{E}[R] = \lambda`: each streamer generates :math:`\lambda` samples + on average, + - :math:`\text{Var}[R] = \lambda \times p`, - :math:`\text{E}[N] = \lambda / p`, and - :math:`\text{Var}[N] = \lambda \frac{1-p}{p^2}`. @@ -182,14 +195,68 @@ These match the analysis of the constant-mode case above, except that the number streamer is now a random variable with expectation :math:`\lambda`. Again, in the special case where :math:`p=1/A`, we recover - - :math:`\text{E}[N] = \lambda A`, and - - :math:`\text{Var}[N] = \lambda A (A-1)`. + - :math:`\text{E}[N] = \lambda \times A`, and + - :math:`\text{Var}[N] = \lambda \times A \times (A-1)`. + +In short, binomial mode ``StochasticMux`` exhibits the same stream replacement +characteristics as the constant-mode case, but relaxes the need for each streamer to +generate an identical number of samples. Limiting case :math:`p=1` ------------------------- - -Discussion and recommendations ------------------------------- +As defined above, the binomial mode is ill-defined when :math:`p=1` due to a +division-by-zero in the parametrization. +This situation does occur in practice with some configurations of ``StochasticMux``, +e.g. when operating in **exhaustive** mode so that streamers are activated without +replacement and are discarded after deactivation. +In this case, the size of the active set :math:`A` can eventually decay, and the +probability of choosing the last active streamer :math:`p \rightarrow 1`. + +To circumvent this issue, ``StochasticMux`` detects this situation automatically and +falls back on a Poisson distribution for :math:`R`. +This is justified by the `Poisson limit theorem `_ if we take the product :math:`\lambda/(1-p) \times (1-p) = \lambda` as the limit value as :math:`p \rightarrow 1`. + + +Discussion +---------- + +The above analysis tells us, on average, how long we should expect to wait before a +given streamer is exhausted and replaced. +Because this distribution applies equally to all streamers, the variance of this +distribution tells us how dispersed these replacement events are likely to be. + +Qualitatively, there are a few things we can observe from the above analysis. + +First, for large active set sizes :math:`A`, binomial mode will behave similarly to constant mode because :math:`\text{Var}[R]` will be inversely proportional to :math:`A`. +For small active sets, binomial mode will behave more similarly to Poisson mode. + +Second, Poisson mode will exhibit the highest variance of sample limit values +:math:`\text{Var}[R] = \lambda` upper-bounds that of the binomial mode +:math:`\text{Var}[R] = \lambda \times p`. +We can therefore expect that the replacement event distribution :math:`\text{Pr}[N]` +under Poisson mode will also exhibit slightly higher variance in general. + +Third, the binomial mode provides a controlled interaction between the stream +replacement rate and the size of the active set, which is difficult to achieve with +Poisson mode. + +Finally, we should emphasize that the analysis in this section represents a common, +if simplified application of ``StochasticMux``, and there are many other variables +at play that may alter the mux's behavior, including: + + - whether streamers are activated with or without replacement, + - whether streamers are used exhaustively or replaced after deactivation, + - whether streamer weights are uniform or non-uniform, + - whether streamers self-limit instead of relying on the mux for deactivation. + +The final point is subtle: remember that streamers encapsulate arbitrary generator +code, and there's nothing stopping a generator from determining its own maximum +number of samples to produce. +If this number is smaller than the value assigned by the mux, the streamer will act +as if it has been exhausted and the mux will replace it immediately. +This situation would both reduce the replacement time average and variance. +(If a streamer self-limits at a number larger than the mux's limit, the mux will +terminate it first and the analysis above still holds.) diff --git a/pescador/mux.py b/pescador/mux.py index d16cede..9240721 100644 --- a/pescador/mux.py +++ b/pescador/mux.py @@ -295,7 +295,7 @@ def __init__( """Given an array (pool) of streamer types, do the following: 1. Select ``k`` streams at random to iterate from - 2. Assign each activated stream a sample count ~ 1 + Poisson(rate) + 2. Assign each activated stream a sample count with expected value `rate` 3. Yield samples from the streams by randomly multiplexing from the active set. 4. When a stream is exhausted, select a new one from `streamers`. @@ -343,6 +343,8 @@ def __init__( from an active streamer. In each case, the expected number of samples will be `rate`. + See :ref:`muxanalysis` for detailed discussion. + random_state : None, int, or np.random.RandomState See `BaseMux` """