From 428b23bdf53b46cc97a808bccf5a28126efed6d3 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 28 Feb 2024 00:56:38 +0100 Subject: [PATCH] optimize parallelization --- af_benchmark/benchmark.py | 8 +- af_benchmark/processor/processor.py | 157 +++++++++++++--------------- 2 files changed, 74 insertions(+), 91 deletions(-) diff --git a/af_benchmark/benchmark.py b/af_benchmark/benchmark.py index 989d561..0932b6f 100644 --- a/af_benchmark/benchmark.py +++ b/af_benchmark/benchmark.py @@ -90,14 +90,10 @@ def reset_processor(self, **kwargs): @tp.enable def run(self): files = get_file_list(self) - - trees = self.processor.open_nanoaod( - files, - self.executor - ) + self.processor.get_column_list(files[0]) column_data = self.processor.read_columns( - trees, + files, self.executor, parallelize_over=self.config.get('processor.parallelize_over') ) diff --git a/af_benchmark/processor/processor.py b/af_benchmark/processor/processor.py index 9ac6b66..e71db3e 100644 --- a/af_benchmark/processor/processor.py +++ b/af_benchmark/processor/processor.py @@ -21,70 +21,53 @@ def open_nanoaod_(self, file_path, **kwargs): return @abstractmethod - def get_column_names(self, tree): + def get_column_list(self, file): return @tp.enable - def read_columns(self, trees, executor, parallelize_over): - column_names = self.get_column_names(trees[0]) - if not parallelize_over: - parallelize_over = "files" - if parallelize_over=="files": - column_data = executor.execute( - self.read_by_file, - trees, - column_names=column_names - ) - elif parallelize_over=="columns": - column_data = executor.execute( - self.read_by_column, - column_names, - trees=trees - ) + def read_columns(self, files, executor, parallelize_over): + arg_dict = { + "files": files, + "columns": self.columns + } + if parallelize_over == "files": + args = [{"files": [file], "columns": self.columns} for file in files] + elif parallelize_over == "columns": + args = [{"files": files, "columns": [col]} for col in self.columns] else: - raise ValueError(f"Can't parallelize over {parallelize_over}") + args = [{"files": [file], "columns": [col]} for file in files for col in self.columns] - return column_data + column_data = executor.execute(self.read_columns_func, args) - def read_by_file(self, tree, **kwargs): - column_names = kwargs.get("column_names", []) - column_data = {} - column_stats = [] - for column_name in column_names: - result = self.read_column(tree, column_name) - column_data[column_name] = result["data"] - if "stats" in result.keys(): - column_stats.append(result["stats"]) - if column_stats: - col_stats_df = pd.concat(column_stats) - else: - col_stats_df = pd.DataFrame() - return column_data, col_stats_df + return column_data - def read_by_column(self, column_name, **kwargs): - trees = kwargs.get("trees", []) + def read_columns_func(self, args): column_data = [] column_stats = [] - for tree in trees: - result = self.read_column(tree, column_name) - column_data.append(result["data"]) - if "stats" in result.keys(): - column_stats.append(result["stats"]) + files = args["files"] + columns = args["columns"] + for file in files: + file_column_data = {} + for column in columns: + result = self.read_column(file, column) + file_column_data[column] = result["data"] + if "stats" in result.keys(): + column_stats.append(result["stats"]) + column_data.append(file_column_data) + col_stats_df = pd.DataFrame() if column_stats: col_stats_df = pd.concat(column_stats) - else: - col_stats_df = pd.DataFrame() - return {column_name: column_data}, col_stats_df + return column_data, col_stats_df @abstractmethod - def read_column(self, tree, column): + def read_column(self, file, column): return @tp.enable def run_operation(self, columns, executor, **kwargs): - return executor.execute(self.execute_func, columns, **kwargs) + return executor.execute(self.run_operation_func, columns, **kwargs) - def execute_func(self, columns, **kwargs): + def run_operation_func(self, columns, **kwargs): if isinstance(columns, tuple): result = self.run_operation_(columns[0], **kwargs) col_stats = columns[1] @@ -106,50 +89,52 @@ def open_nanoaod_(self, file_path, **kwargs): tree = uproot.open(file_path)["Events"] return tree - def get_column_names(self, tree): + def get_column_list(self, file): columns_to_read = self.config.get('processor.columns') + tree = self.open_nanoaod_(file) if isinstance(columns_to_read, list): if any(c not in tree.keys() for c in columns_to_read): raise ValueError(f"Error reading column: {column}") - column_names = columns_to_read + self.column = columns_to_read elif isinstance(columns_to_read, int): - column_names = list(tree.keys())[:columns_to_read] - if len(column_names)