Skip to content

Commit

Permalink
Merge pull request #882 from CoffeaTeam/local_executors_to_dask
Browse files Browse the repository at this point in the history
feat!: switch over to dask-based processing idioms, improve dataset handling
  • Loading branch information
lgray authored Dec 12, 2023
2 parents 234c309 + 994ed4a commit 16dd8df
Show file tree
Hide file tree
Showing 41 changed files with 3,872 additions and 4,968 deletions.
2,003 changes: 2,003 additions & 0 deletions binder/dataset_discovery.ipynb

Large diffs are not rendered by default.

6 changes: 5 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ classifiers = [
]
dependencies = [
"awkward>=2.5.1rc1",
"uproot>=5.2.0rc3",
"uproot>=5.2.0rc4",
"dask[array]>=2023.4.0",
"dask-awkward>=2023.12.1",
"dask-histogram>=2023.10.0",
Expand Down Expand Up @@ -85,6 +85,10 @@ servicex = [
"servicex>=2.5.3",
"func-adl_servicex",
]
rucio = [
"rucio-clients>=32;python_version>'3.8'",
"rucio-clients<32;python_version<'3.9'",
]
dev = [
"pre-commit",
"flake8",
Expand Down
13 changes: 9 additions & 4 deletions src/coffea/analysis_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import coffea.util


class WeightStatistics(coffea.processor.AccumulatorABC):
class WeightStatistics:
def __init__(self, sumw=0.0, sumw2=0.0, minw=numpy.inf, maxw=-numpy.inf, n=0):
self.sumw = sumw
self.sumw2 = sumw2
Expand All @@ -40,6 +40,13 @@ def add(self, other):
self.maxw = max(self.maxw, other.maxw)
self.n += other.n

def __add__(self, other):
temp = WeightStatistics(self.sumw, self.sumw2, self.minw, self.maxw, self.n)
return temp.add(other)

def __iadd__(self, other):
return self.add(other)


class Weights:
"""Container for event weights and associated systematic shifts
Expand All @@ -62,7 +69,7 @@ def __init__(self, size, storeIndividual=False):
self._weight = None if size is None else numpy.ones(size)
self._weights = {}
self._modifiers = {}
self._weightStats = coffea.processor.dict_accumulator()
self._weightStats = {}
self._storeIndividual = storeIndividual

@property
Expand Down Expand Up @@ -102,8 +109,6 @@ def __add_delayed(self, name, weight, weightUp, weightDown, shift):
if self._storeIndividual:
self._weights[name] = weight
self.__add_variation(name, weight, weightUp, weightDown, shift)
if isinstance(self._weightStats, coffea.processor.dict_accumulator):
self._weightStats = {}
self._weightStats[name] = {
"sumw": dask_awkward.to_dask_array(weight).sum(),
"sumw2": dask_awkward.to_dask_array(weight**2).sum(),
Expand Down
18 changes: 18 additions & 0 deletions src/coffea/dataset_tools/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
from coffea.dataset_tools.apply_processor import apply_to_dataset, apply_to_fileset
from coffea.dataset_tools.manipulations import (
get_failed_steps_for_dataset,
get_failed_steps_for_fileset,
max_chunks,
slice_chunks,
)
from coffea.dataset_tools.preprocess import preprocess

__all__ = [
"preprocess",
"apply_to_dataset",
"apply_to_fileset",
"max_chunks",
"slice_chunks",
"get_failed_steps_for_dataset",
"get_failed_steps_for_fileset",
]
126 changes: 126 additions & 0 deletions src/coffea/dataset_tools/apply_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
from __future__ import annotations

import copy
from typing import Any, Callable, Dict, Hashable, List, Set, Tuple, Union

import dask.base
import dask_awkward

from coffea.dataset_tools.preprocess import (
DatasetSpec,
DatasetSpecOptional,
FilesetSpec,
FilesetSpecOptional,
)
from coffea.nanoevents import BaseSchema, NanoAODSchema, NanoEventsFactory
from coffea.processor import ProcessorABC

DaskOutputBaseType = Union[
dask.base.DaskMethodsMixin,
Dict[Hashable, dask.base.DaskMethodsMixin],
Set[dask.base.DaskMethodsMixin],
List[dask.base.DaskMethodsMixin],
Tuple[dask.base.DaskMethodsMixin],
]

# NOTE TO USERS: You can use nested python containers as arguments to dask.compute!
DaskOutputType = Union[DaskOutputBaseType, Tuple[DaskOutputBaseType, ...]]

GenericHEPAnalysis = Callable[[dask_awkward.Array], DaskOutputType]


def apply_to_dataset(
data_manipulation: ProcessorABC | GenericHEPAnalysis,
dataset: DatasetSpec | DatasetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
metadata: dict[Hashable, Any] = {},
uproot_options: dict[str, Any] = {},
) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]:
"""
Apply the supplied function or processor to the supplied dataset.
Parameters
----------
data_manipulation : ProcessorABC or GenericHEPAnalysis
The user analysis code to run on the input dataset
dataset: DatasetSpec | DatasetSpecOptional
The data to be acted upon by the data manipulation passed in.
schemaclass: BaseSchema, default NanoAODSchema
The nanoevents schema to interpret the input dataset with.
metadata: dict[Hashable, Any], default {}
Metadata for the dataset that is accessible by the input analysis. Should also be dask-serializable.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.
Returns
-------
out : DaskOutputType
The output of the analysis workflow applied to the dataset
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
files = dataset["files"]
events = NanoEventsFactory.from_root(
files,
metadata=metadata,
schemaclass=schemaclass,
uproot_options=uproot_options,
).events()

report = None
if isinstance(events, tuple):
events, report = events

out = None
if isinstance(data_manipulation, ProcessorABC):
out = data_manipulation.process(events)
elif isinstance(data_manipulation, Callable):
out = data_manipulation(events)
else:
raise ValueError("data_manipulation must either be a ProcessorABC or Callable")

if report is not None:
return out, report
return out


def apply_to_fileset(
data_manipulation: ProcessorABC | GenericHEPAnalysis,
fileset: FilesetSpec | FilesetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
uproot_options: dict[str, Any] = {},
) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]:
"""
Apply the supplied function or processor to the supplied fileset (set of datasets).
Parameters
----------
data_manipulation : ProcessorABC or GenericHEPAnalysis
The user analysis code to run on the input dataset
fileset: FilesetSpec | FilesetSpecOptional
The data to be acted upon by the data manipulation passed in. Metadata within the fileset should be dask-serializable.
schemaclass: BaseSchema, default NanoAODSchema
The nanoevents schema to interpret the input dataset with.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.
Returns
-------
out : dict[str, DaskOutputType]
The output of the analysis workflow applied to the datasets, keyed by dataset name.
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
out = {}
report = {}
for name, dataset in fileset.items():
metadata = copy.deepcopy(dataset.get("metadata", {}))
metadata.setdefault("dataset", name)
dataset_out = apply_to_dataset(
data_manipulation, dataset, schemaclass, metadata, uproot_options
)
if isinstance(dataset_out, tuple):
out[name], report[name] = dataset_out
else:
out[name] = dataset_out
if len(report) > 0:
return out, report
return out
Loading

0 comments on commit 16dd8df

Please sign in to comment.