diff --git a/scripts/process-latest-products.py b/scripts/process-latest-products.py index 8f5ddeff..9263cbbb 100755 --- a/scripts/process-latest-products.py +++ b/scripts/process-latest-products.py @@ -1,34 +1,73 @@ #!/usr/bin/env python3 +import argparse +import datetime import subprocess -from datetime import datetime, timedelta, timezone from data_processing import utils +from data_processing.metadata_api import MetadataApi -sites = utils.get_all_but_hidden_sites() -sites = [site for site in sites if not site.startswith("arm-")] - -interpreter = "python3" -script = "scripts/cloudnet.py" -subcommand = "process" -wrapper = "scripts/wrapper.py" -products = ( - "categorize,classification,iwc,lwc,drizzle,ier,der,mwr-l1c,mwr-single,mwr-multi" -) - -for site in sites: - date = datetime.now(timezone.utc) - timedelta(3) - subprocess.check_call( - [ - interpreter, - wrapper, - interpreter, - script, - "-s", - site, - "--p", - products, - "--start", - date.strftime("%Y-%m-%d"), - subcommand, - ] + +def main(args_in: argparse.Namespace): + config = utils.read_main_conf() + md_api = MetadataApi(config) + + products = ( + "categorize,classification,iwc,lwc,drizzle,ier,der,mwr-l1c,mwr-single,mwr-multi" ) + + now = datetime.datetime.now(tz=datetime.timezone.utc) + + # timedelta should match the interval of the cron job! + updated_at_from = (now - datetime.timedelta(hours=args_in.hours)).isoformat() + + # Find updated Level 1b files + l1b_metadata = _get_l1b_metadata(md_api, updated_at_from) + model_metadata = _get_model_metadata(md_api, updated_at_from) + metadata = l1b_metadata + model_metadata + + unique_sites = list(set([m["site"]["id"] for m in metadata])) + for site in unique_sites: + dates = [m["measurementDate"] for m in metadata if m["site"]["id"] == site] + unique_dates = list(set(dates)) + for date in unique_dates: + subprocess.check_call( + [ + "python3", + "scripts/wrapper.py", + "python3", + "scripts/cloudnet.py", + "-s", + site, + "-p", + products, + "-d", + date, + "process", + "-r", # Note the reprocess flag + ] + ) + + +def _get_l1b_metadata(md_api: MetadataApi, time: str) -> list[dict]: + categorize_input = ["radar", "lidar", "mwr"] + payload = {"updatedAtFrom": time, "product": categorize_input} + metadata = md_api.get("api/files", payload) + return metadata + + +def _get_model_metadata(md_api: MetadataApi, time: str) -> list[dict]: + payload = {"updatedAtFrom": time} + metadata = md_api.get("api/model-files", payload) + screened_metadata = [ + m + for m in metadata + if any(k in m["site"]["type"] for k in ["cloudnet", "campaign"]) + ] + return screened_metadata + + +if __name__ == "__main__": + parser = argparse.ArgumentParser() + parser.add_argument("--hours", type=int, default=2) + args = parser.parse_args() + main(args)