Skip to content

Commit

Permalink
optimize parallelization
Browse files Browse the repository at this point in the history
  • Loading branch information
kondratyevd committed Feb 27, 2024
1 parent 8a156ca commit 428b23b
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 91 deletions.
8 changes: 2 additions & 6 deletions af_benchmark/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,14 +90,10 @@ def reset_processor(self, **kwargs):
@tp.enable
def run(self):
files = get_file_list(self)

trees = self.processor.open_nanoaod(
files,
self.executor
)
self.processor.get_column_list(files[0])

column_data = self.processor.read_columns(
trees,
files,
self.executor,
parallelize_over=self.config.get('processor.parallelize_over')
)
Expand Down
157 changes: 72 additions & 85 deletions af_benchmark/processor/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,70 +21,53 @@ def open_nanoaod_(self, file_path, **kwargs):
return

@abstractmethod
def get_column_names(self, tree):
def get_column_list(self, file):
return

@tp.enable
def read_columns(self, trees, executor, parallelize_over):
column_names = self.get_column_names(trees[0])
if not parallelize_over:
parallelize_over = "files"
if parallelize_over=="files":
column_data = executor.execute(
self.read_by_file,
trees,
column_names=column_names
)
elif parallelize_over=="columns":
column_data = executor.execute(
self.read_by_column,
column_names,
trees=trees
)
def read_columns(self, files, executor, parallelize_over):
arg_dict = {
"files": files,
"columns": self.columns
}
if parallelize_over == "files":
args = [{"files": [file], "columns": self.columns} for file in files]
elif parallelize_over == "columns":
args = [{"files": files, "columns": [col]} for col in self.columns]
else:
raise ValueError(f"Can't parallelize over {parallelize_over}")
args = [{"files": [file], "columns": [col]} for file in files for col in self.columns]

return column_data
column_data = executor.execute(self.read_columns_func, args)

def read_by_file(self, tree, **kwargs):
column_names = kwargs.get("column_names", [])
column_data = {}
column_stats = []
for column_name in column_names:
result = self.read_column(tree, column_name)
column_data[column_name] = result["data"]
if "stats" in result.keys():
column_stats.append(result["stats"])
if column_stats:
col_stats_df = pd.concat(column_stats)
else:
col_stats_df = pd.DataFrame()
return column_data, col_stats_df
return column_data

def read_by_column(self, column_name, **kwargs):
trees = kwargs.get("trees", [])
def read_columns_func(self, args):
column_data = []
column_stats = []
for tree in trees:
result = self.read_column(tree, column_name)
column_data.append(result["data"])
if "stats" in result.keys():
column_stats.append(result["stats"])
files = args["files"]
columns = args["columns"]
for file in files:
file_column_data = {}
for column in columns:
result = self.read_column(file, column)
file_column_data[column] = result["data"]
if "stats" in result.keys():
column_stats.append(result["stats"])
column_data.append(file_column_data)
col_stats_df = pd.DataFrame()
if column_stats:
col_stats_df = pd.concat(column_stats)
else:
col_stats_df = pd.DataFrame()
return {column_name: column_data}, col_stats_df
return column_data, col_stats_df

@abstractmethod
def read_column(self, tree, column):
def read_column(self, file, column):
return

@tp.enable
def run_operation(self, columns, executor, **kwargs):
return executor.execute(self.execute_func, columns, **kwargs)
return executor.execute(self.run_operation_func, columns, **kwargs)

def execute_func(self, columns, **kwargs):
def run_operation_func(self, columns, **kwargs):
if isinstance(columns, tuple):
result = self.run_operation_(columns[0], **kwargs)
col_stats = columns[1]
Expand All @@ -106,50 +89,52 @@ def open_nanoaod_(self, file_path, **kwargs):
tree = uproot.open(file_path)["Events"]
return tree

def get_column_names(self, tree):
def get_column_list(self, file):
columns_to_read = self.config.get('processor.columns')
tree = self.open_nanoaod_(file)
if isinstance(columns_to_read, list):
if any(c not in tree.keys() for c in columns_to_read):
raise ValueError(f"Error reading column: {column}")
column_names = columns_to_read
self.column = columns_to_read
elif isinstance(columns_to_read, int):
column_names = list(tree.keys())[:columns_to_read]
if len(column_names)<columns_to_read:
raise ValueError(f"Trying to read {columns_to_read} columns, but only {len(column_names)} present in file.")
self.columns = list(tree.keys())[:columns_to_read]
if len(self.columns)<columns_to_read:
raise ValueError(f"Trying to read {columns_to_read} columns, but only {len(self.columns)} present in file.")
else:
raise ValueError(f"Incorrect type of processor.columns parameter: {type(columns_to_read)}")
return column_names

def read_column(self, tree, column_name):
column_data = tree[column_name]
def read_column(self, file, column):
tree = self.open_nanoaod_(file)
column_data = tree[column]
col_stats = pd.DataFrame([{
"file": tree.file.file_path,
"column": column_name,
"column": column,
"compressed_bytes": column_data.compressed_bytes,
"uncompressed_bytes": column_data.uncompressed_bytes
}])
return {"data": column_data, "stats": col_stats}

def run_operation_(self, columns, **kwargs):
def run_operation_(self, column_data, **kwargs):
operation = self.config.get('processor.operation', None)
if not operation:
return
results = {}
for name, data in columns.items():
data_in_memory = np.array([])
if isinstance(data, list):
for item in data:
data_in_memory = np.concatenate((data_in_memory, item.array()))
else:
data_in_memory = data.array()

if operation == 'array':
# just load it in memory
continue
elif operation == 'mean':
results[name] = np.mean(data_in_memory)
elif operation == 'sum':
results[name] = np.sum(data_in_memory)
for file_column_data in column_data:
for data in file_column_data.values():
data_in_memory = np.array([])
if isinstance(data, list):
for item in data:
data_in_memory = np.concatenate((data_in_memory, item.array()))
else:
data_in_memory = data.array()

if operation == 'array':
# just load it in memory
continue
elif operation == 'mean':
np.mean(data_in_memory)
elif operation == 'sum':
np.sum(data_in_memory)
return results


Expand All @@ -164,28 +149,30 @@ def open_nanoaod_(self, file_path, **kwargs):
).events()
return tree

def get_column_names(self, tree):
column_names = self.config.get('processor.columns')
if not isinstance(column_names, list):
def get_column_list(self, file):
self.columns = self.config.get('processor.columns')
tree = self.open_nanoaod_(file)
if not isinstance(self.columns, list):
raise NotImplementedError("For NanoEventsProcessor, only explicit list of columns is currently possible")
return column_names

def read_column(self, tree, column_name):
if column_name in tree.fields:
column_data = tree[column_name]
elif "_" in column_name:
branch, leaf = column_name.split("_")
def read_column(self, file, column):
tree = self.open_nanoaod_(file)
if column in tree.fields:
column_data = tree[column]
elif "_" in column:
branch, leaf = column.split("_")
column_data = tree[branch][leaf]
else:
raise ValueError(f"Error reading column: {column_name}")
raise ValueError(f"Error reading column: {column}")
return {"data": column_data}

def run_operation_(self, columns, **kwargs):
def run_operation_(self, column_data, **kwargs):
operation = self.config.get('processor.operation')
results = {}
for name, data in columns.items():
if operation == 'mean':
results[name] = np.mean(data)
for file_column_data in column_data:
for data in column_data.values():
if operation == 'mean':
np.mean(data)
return results


Expand Down

0 comments on commit 428b23b

Please sign in to comment.