Skip to content

Commit

Permalink
remove NanoEvents processor, leave only uproot
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Feb 28, 2024
1 parent 428b23b commit d3c5191
Show file tree
Hide file tree
Showing 19 changed files with 1,097 additions and 246 deletions.
17 changes: 0 additions & 17 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,6 @@ jobs:
- name: Default test
run: python3 af_benchmark/benchmark.py tests/test-default.yaml

nanoevents-processor:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v2

- name: Setup Python
uses: actions/setup-python@v2
with:
python-version: '3.10'

- name: Install dependencies
run: pip install -r requirements.txt

- name: Test NanoEvents processor
run: python3 af_benchmark/benchmark.py tests/test-nanoevents.yaml

read-from-dir:
runs-on: ubuntu-latest
steps:
Expand Down
5 changes: 1 addition & 4 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ This benchmark is designed for generic but comprehensive performance tests of th
* Parallelized via ``Dask`` using local cluster
* Parallelized via ``Dask`` using Gateway cluster

* Multiple methods of loading and reading columns from NanoAOD ROOT files:

* ``uproot``
* ``coffea.nanoevents``
* Loading and reading columns from NanoAOD ROOT files is done using ``uproot``.

* Generic operations applied to data in columns:

Expand Down
12 changes: 2 additions & 10 deletions af_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from profiling.timing import time_profiler as tp
from data_access.loader import get_file_list
from processor.processor import processors
from processor.uproot_processor import UprootProcessor

from executor.sequential import SequentialExecutor
from executor.futures import FuturesExecutor
Expand Down Expand Up @@ -77,14 +77,7 @@ def reset_executor(self, **kwargs):


def reset_processor(self, **kwargs):
# Select processor method
self.method = self.config.get('processor.method')
if self.method in processors:
self.processor = processors[self.method](self.config)
else:
raise NotImplementedError(
f"Invalid method: {self.method}. Allowed values are: {processors.keys()}"
)
self.processor = UprootProcessor(self.config)

@tp.profile
@tp.enable
Expand Down Expand Up @@ -118,7 +111,6 @@ def update_report(self):
report = {
"n_files": self.n_files,
"n_columns_read": n_cols_read,
"processor": self.method,
"operation": self.config.get('processor.operation'),
"executor": self.backend,
"n_workers": self.executor.get_n_workers(),
Expand Down
85 changes: 0 additions & 85 deletions af_benchmark/dask-gateway-setup.ipynb

This file was deleted.

2 changes: 0 additions & 2 deletions af_benchmark/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ data-access:
processor:
parallelize_over: files
# parallelize_over: columns
method: uproot
# method: nanoevents
columns: 5
# - event
# - Muon_pt
Expand Down
1 change: 0 additions & 1 deletion af_benchmark/example-configs/example-config-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ data-access:
- /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/
processor:
parallelize_over: files
method: uproot
columns:
- event
operation: sum
1 change: 0 additions & 1 deletion af_benchmark/example-configs/example-config-dbs-block.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,5 @@ data-access:
block: /SingleMuon/Run2017C-02Apr2020-v1/NANOAOD#44236284-ff8d-4b95-b971-dcec15b5130f
processor:
parallelize_over: files
method: uproot
columns: 5
operation: sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,5 @@ data-access:
dataset: /SingleMuon/Run2016B-02Apr2020_ver2-v1/NANOAOD
processor:
parallelize_over: files
method: uproot
columns: 5
operation: sum
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,5 @@ data-access:
- /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/
processor:
parallelize_over: files
method: uproot
columns: 5
operation: sum
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
from abc import ABC, abstractmethod
from profiling.timing import time_profiler as tp
import pandas as pd
import numpy as np
import uproot
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema


class BaseProcessor(ABC):
"""A base processor class
"""
class UprootProcessor:
def __init__(self, config):
self.config=config

@tp.enable
def open_nanoaod(self, files, executor, **kwargs):
return executor.execute(self.open_nanoaod_, files, **kwargs)
self.config = config

@abstractmethod
def open_nanoaod_(self, file_path, **kwargs):
return
def open_nanoaod(self, file_path, **kwargs):
tree = uproot.open(file_path)["Events"]
return tree

@abstractmethod
def get_column_list(self, file):
return
columns_to_read = self.config.get('processor.columns')
tree = self.open_nanoaod(file)
if isinstance(columns_to_read, list):
if any(c not in tree.keys() for c in columns_to_read):
raise ValueError(f"Error reading column: {column}")
self.column = columns_to_read
elif isinstance(columns_to_read, int):
self.columns = list(tree.keys())[:columns_to_read]
if len(self.columns)<columns_to_read:
raise ValueError(f"Trying to read {columns_to_read} columns, but only {len(self.columns)} present in file.")
else:
raise ValueError(f"Incorrect type of processor.columns parameter: {type(columns_to_read)}")

@tp.enable
def read_columns(self, files, executor, parallelize_over):
Expand Down Expand Up @@ -58,10 +59,17 @@ def read_columns_func(self, args):
if column_stats:
col_stats_df = pd.concat(column_stats)
return column_data, col_stats_df

@abstractmethod

def read_column(self, file, column):
return
tree = self.open_nanoaod(file)
column_data = tree[column]
col_stats = pd.DataFrame([{
"file": tree.file.file_path,
"column": column,
"compressed_bytes": column_data.compressed_bytes,
"uncompressed_bytes": column_data.uncompressed_bytes
}])
return {"data": column_data, "stats": col_stats}

@tp.enable
def run_operation(self, columns, executor, **kwargs):
Expand All @@ -76,44 +84,6 @@ def run_operation_func(self, columns, **kwargs):
col_stats = None
return result, col_stats

@abstractmethod
def run_operation_(self, columns, **kwargs):
return


class UprootProcessor(BaseProcessor):
def __init__(self, config):
self.config = config

def open_nanoaod_(self, file_path, **kwargs):
tree = uproot.open(file_path)["Events"]
return tree

def get_column_list(self, file):
columns_to_read = self.config.get('processor.columns')
tree = self.open_nanoaod_(file)
if isinstance(columns_to_read, list):
if any(c not in tree.keys() for c in columns_to_read):
raise ValueError(f"Error reading column: {column}")
self.column = columns_to_read
elif isinstance(columns_to_read, int):
self.columns = list(tree.keys())[:columns_to_read]
if len(self.columns)<columns_to_read:
raise ValueError(f"Trying to read {columns_to_read} columns, but only {len(self.columns)} present in file.")
else:
raise ValueError(f"Incorrect type of processor.columns parameter: {type(columns_to_read)}")

def read_column(self, file, column):
tree = self.open_nanoaod_(file)
column_data = tree[column]
col_stats = pd.DataFrame([{
"file": tree.file.file_path,
"column": column,
"compressed_bytes": column_data.compressed_bytes,
"uncompressed_bytes": column_data.uncompressed_bytes
}])
return {"data": column_data, "stats": col_stats}

def run_operation_(self, column_data, **kwargs):
operation = self.config.get('processor.operation', None)
if not operation:
Expand All @@ -137,48 +107,4 @@ def run_operation_(self, column_data, **kwargs):
np.sum(data_in_memory)
return results



class NanoEventsProcessor(BaseProcessor):

def open_nanoaod_(self, file_path, **kwargs):
tree = NanoEventsFactory.from_root(
file_path,
schemaclass=NanoAODSchema.v6,
uproot_options={"timeout": 120}
).events()
return tree

def get_column_list(self, file):
self.columns = self.config.get('processor.columns')
tree = self.open_nanoaod_(file)
if not isinstance(self.columns, list):
raise NotImplementedError("For NanoEventsProcessor, only explicit list of columns is currently possible")

def read_column(self, file, column):
tree = self.open_nanoaod_(file)
if column in tree.fields:
column_data = tree[column]
elif "_" in column:
branch, leaf = column.split("_")
column_data = tree[branch][leaf]
else:
raise ValueError(f"Error reading column: {column}")
return {"data": column_data}

def run_operation_(self, column_data, **kwargs):
operation = self.config.get('processor.operation')
results = {}
for file_column_data in column_data:
for data in column_data.values():
if operation == 'mean':
np.mean(data)
return results


processors = {
'uproot': UprootProcessor,
'nanoevents': NanoEventsProcessor
}


Loading

0 comments on commit d3c5191

Please sign in to comment.