Skip to content

Commit

Permalink
rename handler -> processor
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Feb 13, 2024
1 parent e185d99 commit 4d75e72
Show file tree
Hide file tree
Showing 12 changed files with 39 additions and 38 deletions.
27 changes: 14 additions & 13 deletions af_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from profiling.timing import time_profiler as tp
from data_access.loader import get_file_list
from processing.handler import handlers
from processor.processor import processors

from executor.sequential import SequentialExecutor
from executor.futures import FuturesExecutor
Expand Down Expand Up @@ -40,11 +40,12 @@ def __init__(self, config_path=None):
"dataset",
"n_files",
"n_columns_read",
"bytes_read",
"n_workers",
"total_time",
"operation",
"executor",
"col_handler",
"processor",
]
)
if config_path:
Expand All @@ -64,13 +65,13 @@ def reinitialize(self, config_path):
f"Invalid backend: {self.backend}. Allowed values are: {executors.keys()}"
)

# Select file handler method
self.method = self.config.get('processing.method')
if self.method in handlers:
self.handler = handlers[self.method](self.config)
# Select processor method
self.method = self.config.get('processor.method')
if self.method in processors:
self.processor = processors[self.method](self.config)
else:
raise NotImplementedError(
f"Invalid method: {self.method}. Allowed values are: {handlers.keys()}"
f"Invalid method: {self.method}. Allowed values are: {processors.keys()}"
)


Expand All @@ -81,15 +82,15 @@ def run(self):
self.n_files = len(files)

trees = self.executor.execute(
self.handler.open_nanoaod, files
self.processor.open_nanoaod, files
)

columns_by_file = self.executor.execute(
self.handler.read_columns, trees
self.processor.read_columns, trees
)

outputs = self.executor.execute(
self.handler.run_operation, columns_by_file
self.processor.run_operation, columns_by_file
)

return outputs
Expand All @@ -106,12 +107,12 @@ def update_report(self):
pd.DataFrame([{
"dataset": "",
"n_files": self.n_files,
"n_columns_read": len(self.config.get('processing.columns')),
"n_columns_read": len(self.config.get('processor.columns')),
"n_workers": self.executor.get_n_workers(),
"total_time": run_time,
"operation": self.config.get('processing.operation'),
"operation": self.config.get('processor.operation'),
"executor": self.backend,
"col_handler": self.method,
"processor": self.method,
}])
])

Expand Down
12 changes: 6 additions & 6 deletions af_benchmark/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@ executor:
# backend: dask-local
# backend: dask-gateway
data-access:
# mode: local
# files:
# - tests/data/nano_dimuon.root
mode: local_dir
files_dir: /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/
processing:
mode: local
files:
- tests/data/nano_dimuon.root
# mode: local_dir
# files_dir: /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/
processor:
method: uproot
# method: nanoevents
columns:
Expand Down
2 changes: 1 addition & 1 deletion af_benchmark/example-configs/example-config-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ data-access:
# - tests/data/nano_dimuon.root
mode: local_dir
files_dir: /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/
processing:
processor:
method: uproot
# method: nanoevents
columns:
Expand Down
2 changes: 1 addition & 1 deletion af_benchmark/example-configs/example-config-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ executor:
data-access:
mode: local_dir
files_dir: /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/
processing:
processor:
method: uproot
columns:
- event
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from coffea.nanoevents import NanoEventsFactory, NanoAODSchema


class BaseColumnHandler(ABC):
"""A base column handler class
class BaseProcessor(ABC):
"""A base processor class
"""
def __init__(self, config):
self.config=config
Expand All @@ -29,7 +29,7 @@ def run_operation(self, columns, **kwargs):
return


class UprootColumnHandler(BaseColumnHandler):
class UprootProcessor(BaseProcessor):
def __init__(self, config):
self.config = config
self.col_stats = pd.DataFrame(
Expand All @@ -48,7 +48,7 @@ def open_nanoaod(self, file_path, **kwargs):

@tp.enable
def read_columns(self, tree, **kwargs):
columns_to_read = self.config.get('processing.columns')
columns_to_read = self.config.get('processor.columns')
column_data = {}
for column in columns_to_read:
if column in tree.keys():
Expand All @@ -66,7 +66,7 @@ def read_columns(self, tree, **kwargs):
return column_data

def run_operation(self, columns, **kwargs):
operation = self.config.get('processing.operation')
operation = self.config.get('processor.operation')
results = {}
for name, data in columns.items():
if operation == 'array':
Expand All @@ -78,7 +78,7 @@ def run_operation(self, columns, **kwargs):



class NanoEventsColumnHandler(BaseColumnHandler):
class NanoEventsProcessor(BaseProcessor):
def open_nanoaod(self, file_path, **kwargs):
tree = NanoEventsFactory.from_root(
file_path,
Expand All @@ -88,7 +88,7 @@ def open_nanoaod(self, file_path, **kwargs):
return tree

def read_columns(self, tree, **kwargs):
columns_to_read = self.config.get('processing.columns')
columns_to_read = self.config.get('processor.columns')
column_data = {}
for column in columns_to_read:
if column in tree.fields:
Expand All @@ -101,17 +101,17 @@ def read_columns(self, tree, **kwargs):
return column_data

def run_operation(self, columns, **kwargs):
operation = self.config.get('processing.operation')
operation = self.config.get('processor.operation')
results = {}
for name, data in columns.items():
if operation == 'mean':
results[name] = np.mean(data)
return results


handlers = {
'uproot': UprootColumnHandler,
'nanoevents': NanoEventsColumnHandler
processors = {
'uproot': UprootProcessor,
'nanoevents': NanoEventsProcessor
}


2 changes: 1 addition & 1 deletion tests/config_nanoevents_dask_local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data-access:
mode: local
files:
- tests/data/nano_dimuon.root
processing:
processor:
method: nanoevents
columns:
- event
Expand Down
2 changes: 1 addition & 1 deletion tests/config_nanoevents_futures.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data-access:
mode: local
files:
- tests/data/nano_dimuon.root
processing:
processor:
method: nanoevents
columns:
- event
Expand Down
2 changes: 1 addition & 1 deletion tests/config_nanoevents_sequential.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data-access:
mode: local
files:
- tests/data/nano_dimuon.root
processing:
processor:
method: nanoevents
columns:
- event
Expand Down
2 changes: 1 addition & 1 deletion tests/config_uproot_dask_local.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data-access:
mode: local
files:
- tests/data/nano_dimuon.root
processing:
processor:
method: uproot
columns:
- event
Expand Down
2 changes: 1 addition & 1 deletion tests/config_uproot_futures.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data-access:
mode: local
files:
- tests/data/nano_dimuon.root
processing:
processor:
method: uproot
columns:
- event
Expand Down
2 changes: 1 addition & 1 deletion tests/config_uproot_sequential.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ data-access:
mode: local
files:
- tests/data/nano_dimuon.root
processing:
processor:
method: uproot
columns:
- event
Expand Down

0 comments on commit 4d75e72

Please sign in to comment.