Skip to content

Commit

Permalink
typing and docs
Browse files Browse the repository at this point in the history
  • Loading branch information
lgray committed Dec 12, 2023
1 parent 6d0722c commit 994ed4a
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 8 deletions.
46 changes: 44 additions & 2 deletions src/coffea/dataset_tools/apply_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,29 @@ def apply_to_dataset(
schemaclass: BaseSchema = NanoAODSchema,
metadata: dict[Hashable, Any] = {},
uproot_options: dict[str, Any] = {},
) -> DaskOutputType:
) -> DaskOutputType | tuple[DaskOutputType, dask_awkward.Array]:
"""
Apply the supplied function or processor to the supplied dataset.
Parameters
----------
data_manipulation : ProcessorABC or GenericHEPAnalysis
The user analysis code to run on the input dataset
dataset: DatasetSpec | DatasetSpecOptional
The data to be acted upon by the data manipulation passed in.
schemaclass: BaseSchema, default NanoAODSchema
The nanoevents schema to interpret the input dataset with.
metadata: dict[Hashable, Any], default {}
Metadata for the dataset that is accessible by the input analysis. Should also be dask-serializable.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.
Returns
-------
out : DaskOutputType
The output of the analysis workflow applied to the dataset
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
files = dataset["files"]
events = NanoEventsFactory.from_root(
files,
Expand Down Expand Up @@ -66,7 +88,27 @@ def apply_to_fileset(
fileset: FilesetSpec | FilesetSpecOptional,
schemaclass: BaseSchema = NanoAODSchema,
uproot_options: dict[str, Any] = {},
) -> dict[str, DaskOutputType]:
) -> dict[str, DaskOutputType] | tuple[dict[str, DaskOutputType], dask_awkward.Array]:
"""
Apply the supplied function or processor to the supplied fileset (set of datasets).
Parameters
----------
data_manipulation : ProcessorABC or GenericHEPAnalysis
The user analysis code to run on the input dataset
fileset: FilesetSpec | FilesetSpecOptional
The data to be acted upon by the data manipulation passed in. Metadata within the fileset should be dask-serializable.
schemaclass: BaseSchema, default NanoAODSchema
The nanoevents schema to interpret the input dataset with.
uproot_options: dict[str, Any], default {}
Options to pass to uproot. Pass at least {"allow_read_errors_with_report": True} to turn on file access reports.
Returns
-------
out : dict[str, DaskOutputType]
The output of the analysis workflow applied to the datasets, keyed by dataset name.
report : dask_awkward.Array, optional
The file access report for running the analysis on the input dataset. Needs to be computed in simultaneously with the analysis to be accurate.
"""
out = {}
report = {}
for name, dataset in fileset.items():
Expand Down
73 changes: 69 additions & 4 deletions src/coffea/dataset_tools/manipulations.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,47 @@
from __future__ import annotations

import copy
from typing import Any

import awkward
import numpy

from coffea.dataset_tools.preprocess import DatasetSpec, FilesetSpec


def max_chunks(fileset: FilesetSpec, maxchunks: int | None = None) -> FilesetSpec:
"""
Modify the input dataset so that only the first "maxchunks" chunks of each file will be processed.
Parameters
----------
fileset: FilesetSpec
The set of datasets reduce to max-chunks row-ranges.
maxchunks: int | None, default None
How many chunks to keep for each file.
def max_chunks(fileset, maxchunks=None):
Returns
-------
out : FilesetSpec
The reduced fileset with only the first maxchunks event ranges left in.
"""
return slice_chunks(fileset, slice(maxchunks))


def slice_chunks(fileset, theslice=slice(None)):
def slice_chunks(fileset: FilesetSpec, theslice: Any = slice(None)) -> FilesetSpec:
"""
Modify the input dataset so that only the chunks of each file specified by the input slice are processed.
Parameters
----------
fileset: FilesetSpec
The set of datasets to be sliced.
theslice: Any, default slice(None)
How to slice the array of row-ranges (steps) in the input fileset.
Returns
-------
out : FilesetSpec
The reduce fileset with only the row-ranges specific by theslice left.
"""
if not isinstance(theslice, slice):
theslice = slice(theslice)

Expand All @@ -20,7 +53,23 @@ def slice_chunks(fileset, theslice=slice(None)):
return out


def get_failed_steps_for_dataset(dataset, report):
def get_failed_steps_for_dataset(
dataset: DatasetSpec, report: awkward.Array
) -> DatasetSpec:
"""
Modify an input dataset to only contain the files and row-ranges for *failed* processing jobs as specified in the supplied report.
Parameters
----------
dataset: DatasetSpec
The dataset to be reduced to only contain files and row-ranges that have previously encountered failed file access.
report: awkward.Array
The computed file-access error report from dask-awkward.
Returns
-------
out : DatasetSpec
The reduced dataset with only the row-ranges and files that failed processing, according to the input report.
"""
failed_dataset = copy.deepcopy(dataset)
failed_dataset["files"] = {}
failures = report[~awkward.is_none(report.exception)]
Expand Down Expand Up @@ -60,7 +109,23 @@ def get_failed_steps_for_dataset(dataset, report):
return failed_dataset


def get_failed_steps_for_fileset(fileset, report_dict):
def get_failed_steps_for_fileset(
fileset: FilesetSpec, report_dict: dict[str, awkward.Array]
):
"""
Modify an input dataset to only contain the files and row-ranges for *failed* processing jobs as specified in the supplied report.
Parameters
----------
fileset: FilesetSpec
The set of datasets to be reduced to only contain files and row-ranges that have previously encountered failed file access.
report_dict: dict[str, awkward.Array]
The computed file-access error reports from dask-awkward, indexed by dataset name.
Returns
-------
out : FilesetSpec
The reduced dataset with only the row-ranges and files that failed processing, according to the input report.
"""
failed_fileset = {}
for name, dataset in fileset.items():
failed_dataset = get_failed_steps_for_dataset(dataset, report_dict[name])
Expand Down
56 changes: 54 additions & 2 deletions src/coffea/dataset_tools/preprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,38 @@

def _get_steps(
normed_files: awkward.Array | dask_awkward.Array,
maybe_step_size: None | int = None,
maybe_step_size: int | None = None,
align_clusters: bool = False,
recalculate_seen_steps: bool = False,
skip_bad_files: bool = False,
file_exceptions: Exception | Warning = (FileNotFoundError, OSError),
file_exceptions: Exception
| Warning
| tuple[Exception | Warning] = (FileNotFoundError, OSError),
) -> awkward.Array | dask_awkward.Array:
"""
Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options.
Parameters
----------
normed_files: awkward.Array | dask_awkward.Array
The list of normalized file descriptions to process for steps.
maybe_step_sizes: int | None, default None
If specified, the size of the steps to make when analyzing the input files.
align_clusters: bool, default False
Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in
analysis.
recalculate_seen_steps: bool, default False
If steps are present in the input normed files, force the recalculation of those steps, instead
of only recalculating the steps if the uuid has changed.
skip_bad_files: bool, False
Instead of failing, catch exceptions specified by file_exceptions and return null data.
file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError)
What exceptions to catch when skipping bad files.
Returns
-------
array : awkward.Array | dask_awkward.Array
The normalized file descriptions, appended with the calculated steps for those files.
"""
nf_backend = awkward.backend(normed_files)
lz_or_nf = awkward.typetracer.length_zero_if_typetracer(normed_files)

Expand Down Expand Up @@ -144,6 +170,32 @@ def preprocess(
skip_bad_files: bool = False,
file_exceptions: Exception | Warning = (FileNotFoundError, OSError),
) -> tuple[FilesetSpec, FilesetSpecOptional]:
"""
Given a list of normalized file and object paths (defined in uproot), determine the steps for each file according to the supplied processing options.
Parameters
----------
fileset: FilesetSpecOptional
The set of datasets whose files will be preprocessed.
maybe_step_sizes: int | None, default None
If specified, the size of the steps to make when analyzing the input files.
align_clusters: bool, default False
Round to the cluster size in a root file, when chunks are specified. Reduces data transfer in
analysis.
recalculate_seen_steps: bool, default False
If steps are present in the input normed files, force the recalculation of those steps,
instead of only recalculating the steps if the uuid has changed.
skip_bad_files: bool, False
Instead of failing, catch exceptions specified by file_exceptions and return null data.
file_exceptions: Exception | Warning | tuple[Exception | Warning], default (FileNotFoundError, OSError)
What exceptions to catch when skipping bad files.
Returns
-------
out_available : FilesetSpec
The subset of files in each dataset that were successfully preprocessed, organized by dataset.
out_updated : FilesetSpecOptional
The original set of datasets including files that were not accessible, updated to include the result of preprocessing where available.
"""
out_updated = copy.deepcopy(fileset)
out_available = copy.deepcopy(fileset)
all_ak_norm_files = {}
Expand Down

0 comments on commit 994ed4a

Please sign in to comment.