Skip to content

Commit

Permalink
change the way we read list or number of columns
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Mar 27, 2024
1 parent 1974107 commit 4606f2a
Show file tree
Hide file tree
Showing 10 changed files with 79 additions and 57 deletions.
56 changes: 29 additions & 27 deletions af_benchmark/example-configs/example-config-Hmm.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,35 @@ executor:
processor:
parallelize_over: files
columns:
- run
- luminosityBlock
- HLT_IsoMu24
- Muon_pt
- Muon_eta
- Muon_phi
- Muon_mass
- Muon_charge
- Muon_pfRelIso04_all
- Muon_mediumId
- Muon_ptErr
- Electron_pt
- Electron_eta
- Electron_mvaFall17V2Iso_WP90
- Jet_pt
- Jet_eta
- Jet_phi
- Jet_mass
- PV_npvsGood
- fixedGridRhoFastjetAll
# - Pileup_nTrueInt # MC only
# - genWeight # MC only
# - GenPart_* # MC only
# - LHEScaleWeight # MC only
# - LHEPdfWeight # MC only
# - HTXS_Higgs_pt # Higgs MC only
# - HTXS_njets30 # Higgs MC only
method: column_list
values:
- run
- luminosityBlock
- HLT_IsoMu24
- Muon_pt
- Muon_eta
- Muon_phi
- Muon_mass
- Muon_charge
- Muon_pfRelIso04_all
- Muon_mediumId
- Muon_ptErr
- Electron_pt
- Electron_eta
- Electron_mvaFall17V2Iso_WP90
- Jet_pt
- Jet_eta
- Jet_phi
- Jet_mass
- PV_npvsGood
- fixedGridRhoFastjetAll
# - Pileup_nTrueInt # MC only
# - genWeight # MC only
# - GenPart_* # MC only
# - LHEScaleWeight # MC only
# - LHEPdfWeight # MC only
# - HTXS_Higgs_pt # Higgs MC only
# - HTXS_njets30 # Higgs MC only
load_columns_into_memory: True
worker_operation_time: 0

6 changes: 4 additions & 2 deletions af_benchmark/example-configs/example-config-dbs-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ executor:
backend: futures
processor:
parallelize_over: files
collections:
- Muon
columns:
method: collections
values:
- Muon
load_columns_into_memory: True
worker_operation_time: 0
6 changes: 4 additions & 2 deletions af_benchmark/example-configs/example-config-dbs-datasets.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,9 @@ executor:
backend: futures
processor:
parallelize_over: files
collections:
- Muon
columns:
method: collections
values:
- Muon
load_columns_into_memory: True
worker_operation_time: 0
6 changes: 4 additions & 2 deletions af_benchmark/example-configs/example-config-dbs-files.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ executor:
backend: futures
processor:
parallelize_over: files
collections:
- Muon
columns:
method: collections
values:
- Muon
load_columns_into_memory: True
worker_operation_time: 0
6 changes: 4 additions & 2 deletions af_benchmark/example-configs/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ executor:
backend: sequential
processor:
parallelize_over: files
collections:
- Muon
columns:
method: collections
values:
- Muon
load_columns_into_memory: True
worker_operation_time: 1

28 changes: 15 additions & 13 deletions af_benchmark/uproot_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,14 @@ def open_nanoaod(self, file_path, **kwargs):
return tree

def get_column_list(self, file):
columns_to_read = self.config.get('processor.columns', [])
collections_to_read = self.config.get('processor.collections', [])

if columns_to_read and collections_to_read:
raise ValueError("Specifying columns and collections at the same time is not allowed!")
column_read_method = self.config.get('processor.columns.method', None)
if not column_read_method:
raise ValueError("processor.columns.method must be specified!")

tree = self.open_nanoaod(file)

if collections_to_read:
if column_read_method=='collections':
collections_to_read = self.config.get('processor.columns.values', [])
# All columns from given collections
missing_collections = [
collection for collection in collections_to_read
Expand All @@ -34,23 +33,26 @@ def get_column_list(self, file):
if any(column.startswith(collection) for collection in collections_to_read)
]

elif isinstance(columns_to_read, list):
elif column_read_method=='column_list':
columns_to_read = self.config.get('processor.columns.values', [])
# Explicitly specified columns
missing_columns = [c for c in columns_to_read if c not in tree.keys()]
if missing_columns:
raise ValueError(f"Error reading columns: {', '.join(missing_columns)}")
self.columns = columns_to_read

elif isinstance(columns_to_read, int):
# elif isinstance(columns_to_read, int):
elif column_read_method=='n_columns':
n_columns_to_read = self.config.get('processor.columns.values', 0)
# Number of columns to read
if columns_to_read < 0:
if n_columns_to_read < 0:
raise ValueError("Number of columns can't be negative.")
self.columns = list(tree.keys())[:columns_to_read]
if len(self.columns) < columns_to_read:
raise ValueError(f"Trying to read {columns_to_read} columns, but only {len(self.columns)} present in file.")
self.columns = list(tree.keys())[:n_columns_to_read]
if len(self.columns) < n_columns_to_read:
raise ValueError(f"Trying to read {n_columns_to_read} columns, but only {len(self.columns)} present in file.")

else:
raise ValueError(f"Incorrect type of processor.columns parameter: {type(columns_to_read)}")
raise ValueError(f"Invalid value of processor.columns.method parameter: {column_read_method}")


@tp.enable
Expand Down
4 changes: 3 additions & 1 deletion notebooks/config_2.1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ executor:

processor:
parallelize_over: columns
collections: [Muon]
columns:
method: collections
values: [Muon]
load_columns_into_memory: True
worker_operation_time: 0
4 changes: 3 additions & 1 deletion notebooks/config_3.2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@ data-access:

processor:
parallelize_over: columns
columns: 10
columns:
method: n_columns
values: 10
load_columns_into_memory: True
worker_operation_time: 0

4 changes: 3 additions & 1 deletion tests/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ executor:
backend: sequential
processor:
parallelize_over: files
columns: 5
columns:
method: n_columns
values: 5
load_columns_into_memory: True
worker_operation_time: 0

16 changes: 10 additions & 6 deletions tests/test-processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,20 @@


def test_processor_columns_explicit(b):
b.config["processor"]["columns"] = ["Muon_pt", "Muon_eta"]
b.config["processor"]["columns"]["method"] = "column_list"
b.config["processor"]["columns"]["values"] = ["Muon_pt", "Muon_eta"]
b.run()
print(f"Successfully tested processing explicit list of columns")

def test_processor_columns_number(b):
b.config["processor"]["columns"] = 5
b.config["processor"]["columns"]["method"] = "n_columns"
b.config["processor"]["columns"]["values"] = 5
b.run()
print(f"Successfully tested processing given number of columns")

def test_processor_collections(b):
b.config["processor"]["columns"] = []
b.config["processor"]["collections"] = ["Muon"]
b.config["processor"]["columns"]["method"] = "collections"
b.config["processor"]["collections"]["values"] = ["Muon"]
b.run()
print(f"Successfully tested processing a collection of columns")

Expand Down Expand Up @@ -45,7 +47,8 @@ def test_processor_parallelize_over_files(b):
def test_processor_parallelize_over_columns(b):
b.config["processor"]["parallelize_over"] = "columns"
b.config["executor"]["backend"] = "futures"
b.config["processor"]["columns"] = 2
b.config["processor"]["columns"]["method"] = "n_columns"
b.config["processor"]["columns"]["values"] = 2
b.run()
print(f"Successfully tested parallelization over files")

Expand All @@ -56,7 +59,8 @@ def test_processor_parallelize_over_files_and_columns(b):
"tests/data/nano_dimuon.root",
"tests/data/nano_dimuon.root"
]
b.config["processor"]["columns"] = 2
b.config["processor"]["columns"]["method"] = "n_columns"
b.config["processor"]["columns"]["values"] = 2
b.run()
print(f"Successfully tested parallelization over files and columns")

Expand Down

0 comments on commit 4606f2a

Please sign in to comment.