-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathextract.py
executable file
·131 lines (110 loc) · 4.88 KB
/
extract.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python
import os
import glob
from pathlib import Path
import shutil
import argparse
from concurrent.futures import ProcessPoolExecutor
import uproot
import numpy as np
import tqdm
# Returns the nuber of events present in a tree
def get_tree_entries(path: Path, tree: str):
with uproot.open(path) as file:
tree = file[tree]
n_events = tree.num_entries
return n_events
# Function for reading and extracting data from a chunk of events
def run(kwargs: dict):
slc = kwargs["slc"]
root_file = uproot.open(kwargs["file"])
t = root_file[kwargs["tree"]].arrays(library="ak",
entry_start=slc[0],
entry_stop=slc[1])\
[kwargs["branch"]]
img = t.to_numpy()
out_path = f"{kwargs['tmp_dir']}/img{kwargs['no']:04d}"
np.save(out_path, img)
return f"Saved file: {out_path}"
# Function that merges all temporary files into a single one
def merge(tmp_dir, fname):
files = glob.glob(str(tmp_dir)+"/*")
files.sort()
imgs = [np.load(f) for f in files]
data = np.concatenate(imgs)
np.savez(fname, MM_Image=data)
shutil.rmtree(tmp_dir)
# Function that creates configuration dictionaries
def create_configs(path: Path, tree: str, branch: str,
chunk_size: int, tmp_dir: Path):
# Get the number of entries
n = get_tree_entries(path, tree)
# Slice all events into chunks, acoording to the given chunk_size
slices = np.arange(0, n, chunk_size)
if slices[-1] < n:
slices = np.append(slices,n)
# Make tuples containing the first and last event for each chunk
slices = [(a,b) for a,b in zip(slices[:-1], slices[1:])]
# Common configuration options to all parallel processes
base_config = {"tree": tree,
"file": path,
"branch": branch,
"tmp_dir": tmp_dir}
# Create a configuration dictionary for every chunk by appending the slice
return [{**base_config, **{"slc":s, "no":no}}
for no, s in enumerate(slices)]
# Execute the full workfow
def extract_parallel(path, tree, branch, chunk_size=50,
j=None, tmp_dir=Path("./tmp"),
output=Path("images.npz"),
**kwargs):
# Create the configuration for each process
configs = create_configs(path, tree, branch,
chunk_size, tmp_dir)
# Check if the temporary directory exists
if not tmp_dir.exists():
os.mkdir(tmp_dir)
# Start a parallel process for each configuration
# (Up to the maximum allowed simultaneously)
# Use `tqdm` for tracking progress
with ProcessPoolExecutor(max_workers=j) as executor:
# this list conversion seems necessary to `tqdm` 🤷🏻
list(tqdm.tqdm(executor.map(run, configs), total=len(configs)))
# Finally merge all the temporary files into a single result
merge(tmp_dir, output)
# Function to extract multiple uncompressed branches
def extract_uncompressed(path, tree, branches, j=20,
output=Path("scalars.npz"),
**kwargs):
with uproot.open(path, num_workers=j) as file:
t = file[tree]
arrs = t.arrays(branches, library="np")
np.savez(output, **arrs)
if __name__ == "__main__":
# Define command line arguments
parser = argparse.ArgumentParser(
description="Micromegas Image extraction from NSWL1 TTree")
parser.add_argument("path", action="store", type=Path,
default="MMImages.root", help="input Tree data file, "
"generated by the wrangling script")
parser.add_argument("--tmp-dir", action="store", default="./images/",
type=Path, help="path to temporary storage folder")
parser.add_argument("-j", action="store", default=None, type=int,
help="number of parallel processes (unses all available "
"unless specified otherwise)")
parser.add_argument("-B", "--branch", action="store", default="MM_Image",
type=str, help="name of the branch to be extracted")
parser.add_argument("-T", "--tree", action="store",
default="MMTriggerData", type=str,
help="name of the desired Tree in the input file")
parser.add_argument("-O", "--output", action="store",
default="./images.npz", type=Path,
help="path of the output file")
parser.add_argument("-C", "--chunk-size", action="store",
default=50, type=int,
help="number of events to be processed for "
"each chunk")
# Parse command-line arguments
args = parser.parse_args()
# Run the script
extract_parallel(**vars(args))