Skip to content

Commit

Permalink
Use t-digest to parallelize point to point metrics (#216)
Browse files Browse the repository at this point in the history
* WIP - Initial commit for parallel metrics.

* Adding a simple notebook for testing parallelized PointSigmaIQR.

* Example of metric running in parallel in notebook.

* Parallelizing PointBias. Expanding the demo notebook.

* First attempt at parallelizing PointSigmaMAD.

* Clean up PointSigmaMAD and introduce a method parameter for num_bins.

* Parallelizing PointOutlierRate and updating the notebook with an example.

* Moved PointToPointMetricDigester. Consolidated code in that class. Added **kwargs so that we can pass a config dict to the concrete metric classes.

* Adding unit tests for parallelized point to point metrics.

* Adding another tests case to chase test coverage.

* Using bin centers for `bin_dist` calculation.

* added point_sigma_iqr.eval_from_iterator call and a few pragma statements to get to full coverage

---------

Co-authored-by: Eric Charles <[email protected]>
  • Loading branch information
drewoldag and eacharles committed Feb 26, 2024
1 parent c968675 commit 248b3ee
Show file tree
Hide file tree
Showing 6 changed files with 537 additions and 35 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ dependencies = [
"scipy",
"tables-io",
"deprecated",
"pytdigest",
]

# On a mac, install optional dependencies with `pip install '.[dev]'` (include the single quotes)
Expand All @@ -43,6 +44,7 @@ dev = [
"pylint",
"mpi4py",
"coverage",
"ipyparallel",
]
full = [
"tables-io[full]",
Expand Down
15 changes: 15 additions & 0 deletions src/qp/metrics/base_metric_classes.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,24 @@ class PointToPointMetric(BaseMetric):

metric_input_type = MetricInputType.point_to_point

def eval_from_iterator(self, estimate, reference):
self.initialize()
for estimate, reference in zip(estimate, reference):
centroids = self.accumulate(estimate, reference)
return self.finalize([centroids])

def evaluate(self, estimate, reference):
raise NotImplementedError()

def initialize(self): #pragma: no cover
pass

def accumulate(self, estimate, reference): #pragma: no cover
raise NotImplementedError()

def finalize(self): #pragma: no cover
raise NotImplementedError()


class PointToDistMetric(BaseMetric):
"""A base class for metrics that require a point estimate as the estimated
Expand Down
324 changes: 324 additions & 0 deletions src/qp/metrics/parallel_metrics.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,324 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import ipyparallel as ipp\n",
"from qp.metrics.point_estimate_metric_classes import (\n",
" PointSigmaIQR,\n",
" PointBias,\n",
" PointSigmaMAD,\n",
" PointOutlierRate,\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Generate the random numbers \n",
"SEED = 1002330\n",
"rng = np.random.default_rng(SEED)\n",
"\n",
"chunk_size = 10_000\n",
"n_chunk = 10\n",
"total_size = n_chunk*chunk_size\n",
"\n",
"estimate = rng.lognormal(mean=1.0, sigma=2, size=total_size)\n",
"reference = rng.lognormal(mean=1.3, sigma=1.9, size=total_size)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#generator that yields chunks from estimate and reference\n",
"def chunker(seq, size):\n",
" return (seq[pos:pos + size] for pos in range(0, len(seq), size))\n",
"\n",
"# create an iterator that yields chunks of chunk_size elements\n",
"estimate_chunks = chunker(estimate, chunk_size)\n",
"reference_chunks = chunker(reference, chunk_size)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# A function to pass to MPI\n",
"def mpi_example(chunk):\n",
" centroids = chunk[0].accumulate(chunk[1], chunk[2])\n",
" return centroids"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Here is a function that will configure a local cluster of 4 nodes using MPI as the engine.\n",
"\n",
"A metric estimator class is passed in as well as list of 3-tuple \"data chunks\".\n",
"\n",
"The 3-tuple is (metric class, chunk_of_estimated_values, chunk_of_reference_values)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def run_parallel_metric(data_chunks):\n",
" with ipp.Cluster(controller_ip=\"*\", engines=\"mpi\", n=4) as rc:\n",
" # get a broadcast_view on the cluster which is best\n",
" # suited for MPI style computation\n",
" view = rc.load_balanced_view()\n",
" # run the mpi_example function on all engines in parallel\n",
" asyncresult = view.map_async(mpi_example, data_chunks)\n",
" # Retrieve and print the result from the engines\n",
" asyncresult.wait_interactive()\n",
" # retrieve actual results\n",
" result = asyncresult.get()\n",
" # get and print the results\n",
" for i, res in enumerate(result):\n",
" np.array(res)\n",
" print(f\"{i} : {res.shape}\")\n",
" metric_estimator = data_chunks[0][0]\n",
" final = metric_estimator.finalize(centroids=result)\n",
" print(final)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### An example running the PointSigmaIQR metric directly and in parallel"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set up for ipyparallel\n",
"config = {'tdigest_compression': 1000}\n",
"\n",
"sigma_iqr_estimator = PointSigmaIQR(**config)\n",
"sigma_iqr_estimator_list = [sigma_iqr_estimator]*n_chunk\n",
"iqr_data_chunks = [chunk for chunk in zip(sigma_iqr_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"PointSigmaIQR().evaluate(estimate, reference)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run_parallel_metric(iqr_data_chunks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### An example running the PointBias metric directly and in parallel"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Set up for ipyparallel\n",
"config = {'tdigest_compression': 1000}\n",
"\n",
"point_bias_estimator = PointBias(**config)\n",
"point_bias_estimator_list = [point_bias_estimator]*n_chunk\n",
"point_bias_data_chunks = [chunk for chunk in zip(point_bias_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"PointBias().evaluate(estimate, reference)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run_parallel_metric(point_bias_data_chunks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### An example running PointSigmaMAD directly and in parallel"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# An example with PointSigmaMAD\n",
"config = {'num_bins': 1_000_000, 'tdigest_compression': 1000}\n",
"point_sigma_mad_estimator = PointSigmaMAD(**config)\n",
"point_sigma_mad_estimator_list = [point_sigma_mad_estimator]*n_chunk\n",
"point_sigma_mad_data_chunks = [chunk for chunk in zip(point_sigma_mad_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"PointSigmaMAD().evaluate(estimate, reference)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"This cell allows for adjustment of the `num_bins` parameter.\n",
"\n",
"Larger values trend closer to the analytic result from the cell above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"config = {'num_bins': 1_000_000, 'tdigest_compression': 1000}\n",
"psmad = PointSigmaMAD(**config)\n",
"centroids = psmad.accumulate(estimate, reference)\n",
"\n",
"#default value for `num_bins` is 1_000_000\n",
"psmad.finalize(centroids=[centroids])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run_parallel_metric(point_sigma_mad_data_chunks)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### An example running PointOutlierRate metric directly and in parallel"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# An example with PointOutlierRate\n",
"config = {'tdigest_compression': 1000}\n",
"point_outlier_estimator = PointOutlierRate(**config)\n",
"point_outlier_estimator_list = [point_outlier_estimator]*n_chunk\n",
"point_outlier_data_chunks = [chunk for chunk in zip(point_outlier_estimator_list, chunker(estimate, chunk_size), chunker(reference, chunk_size))]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"PointOutlierRate().evaluate(estimate, reference)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The parallel estimation of the metric trends closer to the analytic as the value of `compression` is increased.\n",
"\n",
"The default value for compression is 1000. If set to 10_000, the estimate becomes 0.13663.\n",
"\n",
"Note that, of course, setting compression = 10_000 increases memory usage with minimal affect on runtime."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"config = {'tdigest_compression': 1000}\n",
"por = PointOutlierRate(**config)\n",
"centroids = por.accumulate(estimate, reference)\n",
"\n",
"por.finalize(centroids=[centroids])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run_parallel_metric(point_outlier_data_chunks)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
2 changes: 1 addition & 1 deletion src/qp/metrics/pit.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def __init__(self, qp_ens, true_vals, eval_grid=DEFAULT_QUANTS):
# efficiently on line 61 with `data_quants = np.nanquantile(...)`.`
samp_mask = np.isfinite(self._pit_samps)
self._pit_samps[~samp_mask] = 0
if not np.all(samp_mask):
if not np.all(samp_mask): #pragma: no cover
logging.warning(
"Some PIT samples were `NaN`. They have been replacd with 0."
)
Expand Down
Loading

0 comments on commit 248b3ee

Please sign in to comment.