-
Notifications
You must be signed in to change notification settings - Fork 17
/
extract_metrics.py
211 lines (168 loc) · 7.16 KB
/
extract_metrics.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
import re
import csv
import glob
import os
import argparse
import numpy as np
def parse_folder_name(folder_name):
dp = re.search(r'dp(\d+)', folder_name)
tp = re.search(r'tp(\d+)', folder_name)
pp = re.search(r'pp(\d+)', folder_name)
mbs = re.search(r'mbs(\d+)', folder_name)
ga = re.search(r'ga(\d+)', folder_name)
sl = re.search(r'sl(\d+)', folder_name)
return {
'dp': int(dp.group(1)) if dp else None,
'tp': int(tp.group(1)) if tp else None,
'pp': int(pp.group(1)) if pp else None,
'micro_batch_size': int(mbs.group(1)) if mbs else None,
'grad_acc': int(ga.group(1)) if ga else None,
'seq_len': int(sl.group(1)) if sl else None
}
def from_readable_format(formatted_str):
if not isinstance(formatted_str, str):
return formatted_str
# Remove any whitespace and convert to upper case for consistency
formatted_str = formatted_str.strip().upper()
# If it's just a number without suffix, return float
try:
return float(formatted_str)
except ValueError:
pass
# Define multipliers
multipliers = {
'T': 1e12,
'B': 1e9,
'M': 1e6,
'K': 1e3
}
# Extract number and suffix
number = float(formatted_str[:-1])
suffix = formatted_str[-1]
if suffix in multipliers:
return number * multipliers[suffix]
else:
raise ValueError(f"Unknown suffix: {suffix}")
def parse_log_line(line):
tokens_s_gpu_match = re.search(r'Tokens/s/GPU:\s*([\d.]+[KMBT]?)', line)
mfu_match = re.search(r'MFU:\s+(\d+\.\d+)%', line)
mfu_value, tokens_value = None, None
if mfu_match:
mfu_value = mfu_match.group(1)
mfu_value = float(mfu_value)
if tokens_s_gpu_match:
tokens_value = tokens_s_gpu_match.group(1)
return mfu_value, from_readable_format(tokens_value)
def process_file(filepath):
tokens_s_gpu_values = []
mfu_values = []
with open(filepath, 'r') as f:
for line in f:
if re.search(r'\[default\d+\]:\[rank \d+\]', line):
mfu_value, tokens_s_gpu_value = parse_log_line(line)
if tokens_s_gpu_value is not None:
tokens_s_gpu_values.append(tokens_s_gpu_value)
if mfu_value is not None:
mfu_values.append(mfu_value)
#NOTE: skip 3 first beginning (warmup)
if len(tokens_s_gpu_values) < 3 and len(mfu_values) < 3:
print(f"Warning: Not enough data points for {filepath}")
return None, None
tokens_s_gpu = int(round(np.mean(tokens_s_gpu_values[3:]))) if tokens_s_gpu_values else None
mfu = int(round(np.mean(mfu_values[3:]))) if mfu_values else None
return mfu, tokens_s_gpu
def write_csv(data, output_filepath):
if not data:
return
fieldnames = ['run_name', 'status', 'dp', 'tp', 'pp', 'micro_batch_size', 'grad_acc', 'seq_len', 'avg_tokens_s_gpu', 'avg_mfu']
with open(output_filepath, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerow(data)
def read_status(status_file):
try:
with open(status_file, 'r') as f:
return f.read().strip()
except:
return None
def create_subdirectory_metrics(input_folder):
"""Create metrics.csv files in each subdirectory"""
pattern = os.path.join(input_folder, '**/*.out')
out_files = glob.glob(pattern, recursive=True)
print(f"Found {len(out_files)} .out files")
processed_dirs = []
for file_path in out_files:
dir_path = os.path.dirname(file_path)
dir_name = os.path.basename(dir_path)
output_csv = os.path.join(dir_path, 'metrics.csv')
params = parse_folder_name(dir_name)
print(f"Processing {file_path}...")
avg_mfu, avg_tokens_s_gpu = process_file(file_path)
status = read_status(os.path.join(dir_path, 'status.txt'))
params['run_name'] = dir_name
write_csv(params, output_csv)
if status is not None:
params['status'] = status
write_csv(params, output_csv)
if avg_tokens_s_gpu is not None:
params['avg_tokens_s_gpu'] = avg_tokens_s_gpu
write_csv(params, output_csv)
processed_dirs.append(dir_path)
print(f"Processed {file_path} -> Created metrics.csv")
if avg_mfu is not None:
params['avg_mfu'] = avg_mfu
write_csv(params, output_csv)
processed_dirs.append(dir_path)
print(f"Processed {file_path} -> Created metrics.csv")
return processed_dirs
def aggregate_metrics(input_folder):
"""Create global_metrics.csv from all subdirectory metrics"""
top_level_dir = glob.glob(input_folder + '/*')
for top_dir_path in top_level_dir:
subdirs = glob.glob(top_dir_path + '/*')
aggregated_data = []
for subdir_path in subdirs:
metrics_file = os.path.join(subdir_path, 'metrics.csv')
status_file = os.path.join(subdir_path, 'status.txt')
folder_name = os.path.basename(subdir_path)
data = {
'run_name': folder_name,
'status': read_status(status_file),
**parse_folder_name(folder_name) # Unpack the parsed parameters
}
# If metrics.csv exists, read the avg_tokens_s_gpu from it
if os.path.exists(metrics_file):
try:
with open(metrics_file, 'r') as f:
reader = csv.DictReader(f)
metrics_data = next(reader)
data['avg_tokens_s_gpu'] = int(metrics_data['avg_tokens_s_gpu'])
data['avg_mfu'] = int(metrics_data['avg_mfu'])
except:
data['avg_tokens_s_gpu'] = -1
data['avg_mfu'] = -1
else:
data['avg_tokens_s_gpu'] = -1
data['avg_mfu'] = -1
aggregated_data.append(data)
# Write global metrics file
output_file = os.path.join(top_dir_path, 'global_metrics.csv')
fieldnames = ['run_name', 'status', 'dp', 'tp', 'pp', 'micro_batch_size',
'grad_acc', 'seq_len', 'avg_tokens_s_gpu', 'avg_mfu']
with open(output_file, 'w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(aggregated_data)
print(f"Created global_metrics.csv with {len(aggregated_data)} entries")
def main():
parser = argparse.ArgumentParser(description='Process log files and create metrics CSVs')
parser.add_argument('input_folder', help='Path to the top-level folder containing experiment subfolders')
args = parser.parse_args()
# Step 1: Create metrics.csv in each subdirectory
print("Creating individual metrics.csv files...")
create_subdirectory_metrics(args.input_folder)
# Step 2: Create global_metrics.csv
print("\nAggregating metrics into global_metrics.csv...")
aggregate_metrics(args.input_folder)
if __name__ == "__main__":
main()