Skip to content

Commit

Permalink
run abstract operation per worker (not per column)
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Mar 27, 2024
1 parent e5daa8f commit da42946
Show file tree
Hide file tree
Showing 11 changed files with 62 additions and 50 deletions.
7 changes: 1 addition & 6 deletions af_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,7 @@ def run(self):
files = get_file_list(self)
self.processor.get_column_list(files[0])

self.col_stats = self.processor.process_columns(
files,
self.executor,
parallelize_over=self.config.get('processor.parallelize_over', 'files'),
load_into_memory=True
)
self.col_stats = self.processor.run_processor(files, self.executor)

def update_report(self):
report = {
Expand Down
3 changes: 2 additions & 1 deletion af_benchmark/example-configs/example-config-dbs-blocks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ processor:
parallelize_over: files
collections:
- Muon
operation: load_into_memory
load_columns_into_memory: True
worker_operation_time: 0
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ processor:
parallelize_over: files
collections:
- Muon
operation: load_into_memory
load_columns_into_memory: True
worker_operation_time: 0
3 changes: 2 additions & 1 deletion af_benchmark/example-configs/example-config-dbs-files.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ processor:
parallelize_over: files
collections:
- Muon
operation: load_into_memory
load_columns_into_memory: True
worker_operation_time: 0
4 changes: 3 additions & 1 deletion af_benchmark/example-configs/example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ processor:
parallelize_over: files
collections:
- Muon
operation: load_into_memory
load_columns_into_memory: True
worker_operation_time: 1

5 changes: 0 additions & 5 deletions af_benchmark/executors/sequential.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,7 @@ def _execute(self, func, args, **kwargs):
:meta public:
"""
first = [func(arg, **kwargs) for arg in args]
return [func(arg, **kwargs) for arg in args]

print(first)
second=2
print(second)

def get_n_workers(self):
return 1
41 changes: 23 additions & 18 deletions af_benchmark/uproot_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import pandas as pd
import numpy as np
import uproot
import time
from operations import operations

class UprootProcessor:
def __init__(self, config):
Expand Down Expand Up @@ -53,8 +55,9 @@ def get_column_list(self, file):


@tp.enable
def process_columns(self, files, executor, **kwargs):
parallelize_over = kwargs.get("parallelize_over", 'files')
def run_processor(self, files, executor, **kwargs):
parallelize_over = self.config.get('processor.parallelize_over', 'files'),

arg_dict = {
"files": files,
"columns": self.columns
Expand All @@ -68,12 +71,12 @@ def process_columns(self, files, executor, **kwargs):
else:
raise ValueError(f"Incorrect parameter: parallelize_over={parallelize_over}")

col_stats = executor.execute(self.process_columns_func, args, **kwargs)
col_stats = executor.execute(self.worker_func, args, **kwargs)

return pd.concat(col_stats).reset_index(drop=True)


def process_columns_func(self, args, **kwargs):
def worker_func(self, args, **kwargs):
column_stats = []
col_stats_df = pd.DataFrame()
files = args["files"]
Expand All @@ -82,6 +85,7 @@ def process_columns_func(self, args, **kwargs):
for column in columns:
col_stats = self.process_column(file, column, **kwargs)
col_stats_df = pd.concat([col_stats_df, col_stats])
self.run_worker_operation()
return col_stats_df


Expand All @@ -95,29 +99,30 @@ def process_column(self, file, column, **kwargs):
"uncompressed_bytes": column_data.uncompressed_bytes,
"nevents": tree.num_entries
}])
self.run_operation(column_data)
if self.config.get('processor.load_columns_into_memory', False):
self.load_columns_into_memory(column_data)
return col_stats


def run_operation(self, column_data, **kwargs):
operation = self.config.get('processor.operation', 'nothing')

if (not operation) or (operation=='nothing'):
return

def load_columns_into_memory(self, column_data):
data_in_memory = np.array([])
if isinstance(column_data, list):
for item in column_data:
data_in_memory = np.concatenate((data_in_memory, item.array()))
else:
data_in_memory = column_data.array()

if operation == 'load_into_memory':
return
elif operation == 'mean':
np.mean(data_in_memory)
elif operation == 'sum':
np.sum(data_in_memory)

def run_worker_operation(self):
timeout = self.config.get('processor.worker_operation_time', 0)
if timeout==0:
return


# compute pi until timeout
start_time = time.time()
pi = k = 0
while True:
pi += (4.0 * (-1)**k) / (2*k + 1)
k += 1
if time.time() - start_time > timeout:
return
21 changes: 11 additions & 10 deletions notebooks/config_2.1.yaml
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
executor:
# backend: dask-gateway
backend: dask-local
# backend: sequential
n_workers: 4
data-access:
mode: local
mode: explicit-files
files:
# - root://eos.cms.rcac.purdue.edu//store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/90322FC2-4027-0E47-92E4-22307EC8EAD2.root
- /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/90322FC2-4027-0E47-92E4-22307EC8EAD2.root
- /depot/cms/users/dkondra/90322FC2-4027-0E47-92E4-22307EC8EAD2.root

executor:
backend: dask-gateway
# backend: dask-local
# backend: sequential
n_workers: 1

processor:
parallelize_over: columns
columns: 10
operation: load_into_memory
collections: [Muon]
load_columns_into_memory: True
worker_operation_time: 0
4 changes: 3 additions & 1 deletion notebooks/config_3.2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,6 @@ data-access:
processor:
parallelize_over: columns
columns: 10
operation: load_into_memory
load_columns_into_memory: True
worker_operation_time: 0

3 changes: 3 additions & 0 deletions tests/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,6 @@ processor:
parallelize_over: files
columns: 5
operation: sum
load_columns_into_memory: True
worker_operation_time: 0

18 changes: 12 additions & 6 deletions tests/test-processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@ def test_processor_collections(b):
b.run()
print(f"Successfully tested processing a collection of columns")

def test_processor_operation_nothing(b):
b.config["processor"]["operation"] = "nothing"
def test_processor_dont_load(b):
b.config["processor"]["load_into_memory"] = False
b.run()
print(f"Successfully tested doing nothing to specified columns")

def test_processor_operation_load(b):
b.config["processor"]["operation"] = "load_into_memory"
def test_processor_load(b):
b.config["processor"]["load_into_memory"] = True
b.run()
print(f"Successfully tested loading specified columns into memory")

def test_processor_worker_operation(b):
b.config["processor"]["worker_operation_time"] = 10
b.run()
print(f"Successfully tested worker operation (10s)")

def test_processor_parallelize_over_files(b):
b.config["processor"]["parallelize_over"] = "files"
b.config["executor"]["backend"] = "futures"
Expand Down Expand Up @@ -63,8 +68,9 @@ def test_processor_parallelize_over_files_and_columns(b):
test_processor_columns_explicit,
test_processor_columns_number,
test_processor_collections,
test_processor_operation_nothing,
test_processor_operation_load,
test_processor_dont_load,
test_processor_load,
test_processor_worker_operation,
test_processor_parallelize_over_files,
test_processor_parallelize_over_columns,
test_processor_parallelize_over_files_and_columns,
Expand Down

0 comments on commit da42946

Please sign in to comment.