diff --git a/af_benchmark/benchmark.py b/af_benchmark/benchmark.py index e6ae084..6aaf593 100644 --- a/af_benchmark/benchmark.py +++ b/af_benchmark/benchmark.py @@ -84,12 +84,7 @@ def run(self): files = get_file_list(self) self.processor.get_column_list(files[0]) - self.col_stats = self.processor.process_columns( - files, - self.executor, - parallelize_over=self.config.get('processor.parallelize_over', 'files'), - load_into_memory=True - ) + self.col_stats = self.processor.run_processor(files, self.executor) def update_report(self): report = { diff --git a/af_benchmark/example-configs/example-config-dbs-blocks.yaml b/af_benchmark/example-configs/example-config-dbs-blocks.yaml index 09b3ebd..f6866fb 100644 --- a/af_benchmark/example-configs/example-config-dbs-blocks.yaml +++ b/af_benchmark/example-configs/example-config-dbs-blocks.yaml @@ -11,4 +11,5 @@ processor: parallelize_over: files collections: - Muon - operation: load_into_memory + load_columns_into_memory: True + worker_operation_time: 0 diff --git a/af_benchmark/example-configs/example-config-dbs-datasets.yaml b/af_benchmark/example-configs/example-config-dbs-datasets.yaml index db02cef..b8a8e05 100644 --- a/af_benchmark/example-configs/example-config-dbs-datasets.yaml +++ b/af_benchmark/example-configs/example-config-dbs-datasets.yaml @@ -10,4 +10,5 @@ processor: parallelize_over: files collections: - Muon - operation: load_into_memory + load_columns_into_memory: True + worker_operation_time: 0 diff --git a/af_benchmark/example-configs/example-config-dbs-files.yaml b/af_benchmark/example-configs/example-config-dbs-files.yaml index 120b543..123cdf4 100644 --- a/af_benchmark/example-configs/example-config-dbs-files.yaml +++ b/af_benchmark/example-configs/example-config-dbs-files.yaml @@ -11,4 +11,5 @@ processor: parallelize_over: files collections: - Muon - operation: load_into_memory + load_columns_into_memory: True + worker_operation_time: 0 diff --git a/af_benchmark/example-configs/example-config.yaml b/af_benchmark/example-configs/example-config.yaml index 28d9fd5..79e6a8d 100644 --- a/af_benchmark/example-configs/example-config.yaml +++ b/af_benchmark/example-configs/example-config.yaml @@ -8,4 +8,6 @@ processor: parallelize_over: files collections: - Muon - operation: load_into_memory + load_columns_into_memory: True + worker_operation_time: 1 + diff --git a/af_benchmark/executors/sequential.py b/af_benchmark/executors/sequential.py index 826d74b..539fd02 100644 --- a/af_benchmark/executors/sequential.py +++ b/af_benchmark/executors/sequential.py @@ -12,12 +12,7 @@ def _execute(self, func, args, **kwargs): :meta public: """ - first = [func(arg, **kwargs) for arg in args] return [func(arg, **kwargs) for arg in args] - print(first) - second=2 - print(second) - def get_n_workers(self): return 1 diff --git a/af_benchmark/uproot_processor.py b/af_benchmark/uproot_processor.py index 52795ab..8a5279f 100644 --- a/af_benchmark/uproot_processor.py +++ b/af_benchmark/uproot_processor.py @@ -2,6 +2,8 @@ import pandas as pd import numpy as np import uproot +import time +from operations import operations class UprootProcessor: def __init__(self, config): @@ -53,8 +55,9 @@ def get_column_list(self, file): @tp.enable - def process_columns(self, files, executor, **kwargs): - parallelize_over = kwargs.get("parallelize_over", 'files') + def run_processor(self, files, executor, **kwargs): + parallelize_over = self.config.get('processor.parallelize_over', 'files'), + arg_dict = { "files": files, "columns": self.columns @@ -68,12 +71,12 @@ def process_columns(self, files, executor, **kwargs): else: raise ValueError(f"Incorrect parameter: parallelize_over={parallelize_over}") - col_stats = executor.execute(self.process_columns_func, args, **kwargs) + col_stats = executor.execute(self.worker_func, args, **kwargs) return pd.concat(col_stats).reset_index(drop=True) - def process_columns_func(self, args, **kwargs): + def worker_func(self, args, **kwargs): column_stats = [] col_stats_df = pd.DataFrame() files = args["files"] @@ -82,6 +85,7 @@ def process_columns_func(self, args, **kwargs): for column in columns: col_stats = self.process_column(file, column, **kwargs) col_stats_df = pd.concat([col_stats_df, col_stats]) + self.run_worker_operation() return col_stats_df @@ -95,16 +99,12 @@ def process_column(self, file, column, **kwargs): "uncompressed_bytes": column_data.uncompressed_bytes, "nevents": tree.num_entries }]) - self.run_operation(column_data) + if self.config.get('processor.load_columns_into_memory', False): + self.load_columns_into_memory(column_data) return col_stats - def run_operation(self, column_data, **kwargs): - operation = self.config.get('processor.operation', 'nothing') - - if (not operation) or (operation=='nothing'): - return - + def load_columns_into_memory(self, column_data): data_in_memory = np.array([]) if isinstance(column_data, list): for item in column_data: @@ -112,12 +112,17 @@ def run_operation(self, column_data, **kwargs): else: data_in_memory = column_data.array() - if operation == 'load_into_memory': - return - elif operation == 'mean': - np.mean(data_in_memory) - elif operation == 'sum': - np.sum(data_in_memory) + def run_worker_operation(self): + timeout = self.config.get('processor.worker_operation_time', 0) + if timeout==0: + return - + # compute pi until timeout + start_time = time.time() + pi = k = 0 + while True: + pi += (4.0 * (-1)**k) / (2*k + 1) + k += 1 + if time.time() - start_time > timeout: + return diff --git a/notebooks/config_2.1.yaml b/notebooks/config_2.1.yaml index b9b085d..4e6b379 100644 --- a/notebooks/config_2.1.yaml +++ b/notebooks/config_2.1.yaml @@ -1,15 +1,16 @@ -executor: - # backend: dask-gateway - backend: dask-local - # backend: sequential - n_workers: 4 data-access: - mode: local + mode: explicit-files files: - # - root://eos.cms.rcac.purdue.edu//store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/90322FC2-4027-0E47-92E4-22307EC8EAD2.root - - /eos/purdue/store/data/Run2016B/SingleMuon/NANOAOD/02Apr2020_ver2-v1/20000/90322FC2-4027-0E47-92E4-22307EC8EAD2.root + - /depot/cms/users/dkondra/90322FC2-4027-0E47-92E4-22307EC8EAD2.root + +executor: + backend: dask-gateway + # backend: dask-local + # backend: sequential + n_workers: 1 processor: parallelize_over: columns - columns: 10 - operation: load_into_memory + collections: [Muon] + load_columns_into_memory: True + worker_operation_time: 0 diff --git a/notebooks/config_3.2.yaml b/notebooks/config_3.2.yaml index 0e861f6..9991870 100644 --- a/notebooks/config_3.2.yaml +++ b/notebooks/config_3.2.yaml @@ -13,4 +13,6 @@ data-access: processor: parallelize_over: columns columns: 10 - operation: load_into_memory + load_columns_into_memory: True + worker_operation_time: 0 + diff --git a/tests/config-default.yaml b/tests/config-default.yaml index 9e36b99..158a0df 100644 --- a/tests/config-default.yaml +++ b/tests/config-default.yaml @@ -8,3 +8,6 @@ processor: parallelize_over: files columns: 5 operation: sum + load_columns_into_memory: True + worker_operation_time: 0 + diff --git a/tests/test-processing.py b/tests/test-processing.py index 8a5bd63..9975558 100644 --- a/tests/test-processing.py +++ b/tests/test-processing.py @@ -17,16 +17,21 @@ def test_processor_collections(b): b.run() print(f"Successfully tested processing a collection of columns") -def test_processor_operation_nothing(b): - b.config["processor"]["operation"] = "nothing" +def test_processor_dont_load(b): + b.config["processor"]["load_into_memory"] = False b.run() print(f"Successfully tested doing nothing to specified columns") -def test_processor_operation_load(b): - b.config["processor"]["operation"] = "load_into_memory" +def test_processor_load(b): + b.config["processor"]["load_into_memory"] = True b.run() print(f"Successfully tested loading specified columns into memory") +def test_processor_worker_operation(b): + b.config["processor"]["worker_operation_time"] = 10 + b.run() + print(f"Successfully tested worker operation (10s)") + def test_processor_parallelize_over_files(b): b.config["processor"]["parallelize_over"] = "files" b.config["executor"]["backend"] = "futures" @@ -63,8 +68,9 @@ def test_processor_parallelize_over_files_and_columns(b): test_processor_columns_explicit, test_processor_columns_number, test_processor_collections, - test_processor_operation_nothing, - test_processor_operation_load, + test_processor_dont_load, + test_processor_load, + test_processor_worker_operation, test_processor_parallelize_over_files, test_processor_parallelize_over_columns, test_processor_parallelize_over_files_and_columns,