Skip to content

Commit c0bfdc9

Browse files
author
V
committed
Script to find airflow jobs that failed to parse rt data
1 parent f2e3ccb commit c0bfdc9

File tree

1 file changed

+74
-0
lines changed

1 file changed

+74
-0
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""
2+
Script that checks if an outcomes file exists for an RT parsing job. If it exists, that means the job completed successfully.
3+
4+
If it doesn't exist, the job either failed midway or or never ran. This only properly analyzes results from the parse_and_validate_v2 airflow job. In the new airflow job parse_and_validate,
5+
now each agency is parsed separately so it's possible some agencies were parsed and others were not.
6+
"""
7+
import time
8+
9+
import pandas as pd
10+
from google.cloud import storage
11+
12+
client = storage.Client(project="cal-itp-data-infra")
13+
14+
# calitp-gtfs-rt-parsed/trip_updates_outcomes/dt=2025-08-25/hour=2025-08-25T00:00:00+00:00/trip_updates.jsonl
15+
# calitp-gtfs-rt-parsed/service_alerts_outcomes/dt=2022-09-15/hour=2022-09-15T20:00:00+00:00
16+
# vehicle_positions_outcomes/
17+
hours = [
18+
f"{dt.strftime('%Y-%m-%dT%H:00:00+00:00')}"
19+
for dt in pd.date_range("2022-09-15", "2025-09-09", freq="H")[:-1]
20+
]
21+
22+
generated_file_paths = []
23+
paths = [
24+
"trip_updates_outcomes/",
25+
"service_alerts_outcomes/",
26+
"vehicle_positions_outcomes/",
27+
]
28+
for path in paths:
29+
for hour in hours:
30+
dt_str = hour[0:10]
31+
full_path = f"{path}dt={dt_str}/hour={hour}"
32+
generated_file_paths.append(full_path)
33+
print(generated_file_paths[20])
34+
35+
36+
bucket_name = "calitp-gtfs-rt-parsed"
37+
# calitp-gtfs-rt-parsed/trip_updates_outcomes/dt=2025-08-25/hour=2025-08-25T04:00:00+00:00/trip_updates.jsonl
38+
prefixes = [
39+
"trip_updates_outcomes/",
40+
"service_alerts_outcomes/",
41+
"vehicle_positions_outcomes/",
42+
]
43+
bucket = client.get_bucket(bucket_name)
44+
seen_files_in_parsed_bucket = []
45+
start = time.time()
46+
for prefix in prefixes:
47+
blob_iterator = bucket.list_blobs(prefix=prefix)
48+
seen_files_in_parsed_bucket.extend([(blob.name) for blob in blob_iterator])
49+
end = time.time()
50+
print(f"Elapsed time: {end - start:.2f} seconds")
51+
52+
53+
# Remove the filename from each path keeping only the directory portion
54+
seen_dirs_in_parsed_bucket = [
55+
fp.rsplit("/", 1)[0] for fp in seen_files_in_parsed_bucket
56+
]
57+
print(seen_dirs_in_parsed_bucket[:5])
58+
59+
60+
# Use sets to find the difference between expected and actual file paths
61+
expected_set = set(generated_file_paths)
62+
actual_set = set(seen_dirs_in_parsed_bucket)
63+
missing_files = expected_set - actual_set
64+
extra_files = actual_set - expected_set
65+
print(f"Missing files: {len(missing_files)}")
66+
print(f"Extra files: {len(extra_files)}")
67+
print("Sample missing files:", list(missing_files)[:5])
68+
print("Sample extra files:", list(extra_files)[:5])
69+
70+
71+
# Write missing files to a text file
72+
with open("missing_files_to_process.txt", "w") as f:
73+
for path in sorted(missing_files):
74+
f.write(path + "\n")

0 commit comments

Comments
 (0)