From 0bdc8e1e793b0d1042e3a57a44bbdb2e15428989 Mon Sep 17 00:00:00 2001 From: Paul Gierz Date: Mon, 3 Feb 2025 13:54:18 +0100 Subject: [PATCH] wip --- src/esm_runscripts/old_workflow.py | 782 +++++++++++++++++ src/esm_runscripts/workflow-scratch.py | 26 + src/esm_runscripts/workflow.py | 972 +++++---------------- tests/test_esm_runscripts/test_workflow.py | 642 ++++++++++++++ 4 files changed, 1660 insertions(+), 762 deletions(-) create mode 100644 src/esm_runscripts/old_workflow.py create mode 100644 src/esm_runscripts/workflow-scratch.py create mode 100644 tests/test_esm_runscripts/test_workflow.py diff --git a/src/esm_runscripts/old_workflow.py b/src/esm_runscripts/old_workflow.py new file mode 100644 index 000000000..e99352b9f --- /dev/null +++ b/src/esm_runscripts/old_workflow.py @@ -0,0 +1,782 @@ +def should_skip_cluster(cluster, config): + """ + Determine whether a specific cluster should be skipped based on the provided configuration. + + Parameters + ---------- + cluster : str + The name of the cluster to check. + config : dict + A dictionary containing various configuration settings. + + Returns + ------- + bool + True if the cluster should be skipped, False otherwise. + + Notes + ----- + The function evaluates several conditions to decide if the cluster should be skipped: + 1. If `run_only` in the cluster configuration is set to "last_run_in_chunk" and `last_run_in_chunk` in the general configuration is not True. + 2. If `run_only` in the cluster configuration is set to "first_run_in_chunk" and `first_run_in_chunk` in the general configuration is not True. + 3. If `skip_chunk_number` in the cluster configuration matches `chunk_number` in the general configuration. + 4. If `skip_run_number` in the cluster configuration matches `run_number` in the general configuration. + + If none of these conditions are met, the function returns False, indicating that the cluster should not be skipped. + """ + general_config = config["general"] + workflow_config = general_config["workflow"] + cluster_config = workflow_config["clusters"][cluster] + + run_only_on = cluster_config.get("run_only") + + if run_only_on == "last_run_in_chunk" and not general_config.get( + "last_run_in_chunk", False + ): + return True + + if run_only_on == "first_run_in_chunk" and not general_config.get( + "first_run_in_chunk", False + ): + return True + + skip_chunk_number = cluster_config.get("skip_chunk_number") + if skip_chunk_number == general_config.get("chunk_number"): + return True + + skip_run_number = cluster_config.get("skip_run_number") + if skip_run_number == general_config.get("run_number"): + return True + + return False + + +def assemble_workflow(config): + config = init_total_workflow(config) + config = collect_all_workflow_information(config) + config = complete_clusters(config) + config = order_clusters(config) + config = prepend_newrun_job(config) + config = handle_unknown_jobtype(config) + return config + + +def handle_unknown_jobtype(config): + """ + Update the jobtype in the configuration if it is set to 'unknown'. + + Args: + config (dict): The configuration dictionary. + + Returns: + dict: The updated configuration dictionary. + """ + if config["general"]["jobtype"] == "unknown": + first_task = config["general"]["workflow"]["first_task_in_queue"] + config["general"]["command_line_config"]["jobtype"] = first_task + config["general"]["jobtype"] = first_task + return config + + +def display_nicely(config): + esm_parser.pprint_config(config["general"]["workflow"]) + return config + + +def prepend_newrun_job(config): + gw_config = config["general"]["workflow"] + first_cluster_name = gw_config["first_task_in_queue"] + first_cluster = gw_config["clusters"][first_cluster_name] + + if not first_cluster.get("submission_type", "Error") == "sim_object": + last_cluster_name = gw_config["last_task_in_queue"] + last_cluster = gw_config["clusters"][last_cluster_name] + + new_first_cluster_name = "newrun" + new_first_cluster = { + "newrun": { + "called_from": last_cluster_name, + "run_before": first_cluster_name, + "next_submit": [first_cluster_name], + "jobs": ["newrun_general"], + "submission_type": "sim_object", + } + } + + last_cluster["next_submit"].append("newrun") + last_cluster["next_submit"].remove(first_cluster_name) + + first_cluster["called_from"] = "newrun" + + gw_config["first_task_in_queue"] = "newrun" + + new_job = { + "newrun_general": { + "nproc": 1, + "called_from": last_cluster_name, + "run_before": first_cluster_name, + "next_submit": [first_cluster_name], + "job_cluster": "newrun", + } + } + + gw_config["clusters"].update(new_first_cluster) + gw_config["jobs"].update(new_job) + + return config + + +def order_clusters(config): + gw_config = config["general"]["workflow"] + + initialize_next_submit(gw_config) + validate_and_set_dependencies(gw_config) + handle_next_run_triggered_by_and_last_task_in_queue(gw_config) + ensure_first_and_last_clusters_linked(gw_config) + + return config + + +def initialize_next_submit(gw_config): + for job_cluster in gw_config["clusters"]: + if "next_submit" not in gw_config["clusters"][job_cluster]: + gw_config["clusters"][job_cluster]["next_submit"] = [] + + +def validate_and_set_dependencies(gw_config): + for job_cluster in gw_config["clusters"]: + cluster_config = gw_config["clusters"][job_cluster] + if "run_after" not in cluster_config and "run_before" not in cluster_config: + raise WorkflowValidateError( + f"Don't know when to execute cluster {job_cluster}.", gw_config + ) + + if "run_after" in cluster_config: + validate_run_after(cluster_config, job_cluster, gw_config) + if "run_before" in cluster_config: + validate_run_before(cluster_config, job_cluster, gw_config) + + +def validate_run_after(cluster_config, job_cluster, gw_config): + if "run_before" in cluster_config: + raise WorkflowValidateError( + f"Specifying both run_after and run_before for cluster {job_cluster} may lead to problems. Please choose.", + gw_config, + ) + + calling_cluster = cluster_config["run_after"] + if calling_cluster not in gw_config["clusters"]: + raise WorkflowValidateError( + f"Validate run after -- Unknown cluster {calling_cluster}.", gw_config + ) + + append_to_next_submit(gw_config, calling_cluster, job_cluster) + cluster_config["called_from"] = calling_cluster + + if calling_cluster == gw_config["last_task_in_queue"]: + gw_config["last_task_in_queue"] = job_cluster + + +def validate_run_before(cluster_config, job_cluster, gw_config): + called_cluster = cluster_config["run_before"] + if called_cluster not in gw_config["clusters"]: + raise WorkflowValidateError( + f"Validate run before -- Unknown cluster {called_cluster}.", gw_config + ) + + append_to_next_submit(gw_config, job_cluster, called_cluster) + gw_config["clusters"][called_cluster]["called_from"] = job_cluster + + if called_cluster == gw_config["first_task_in_queue"]: + gw_config["first_task_in_queue"] = job_cluster + + +def append_to_next_submit(gw_config, source_cluster, target_cluster): + if target_cluster not in gw_config["clusters"][source_cluster]["next_submit"]: + gw_config["clusters"][source_cluster]["next_submit"].append(target_cluster) + + +def ensure_first_and_last_clusters_linked(gw_config): + first_cluster_name = gw_config["first_task_in_queue"] + first_cluster = gw_config["clusters"][first_cluster_name] + last_cluster_name = gw_config["last_task_in_queue"] + last_cluster = gw_config["clusters"][last_cluster_name] + + if first_cluster_name not in last_cluster.get("next_submit", ["Error"]): + last_cluster["next_submit"].append(first_cluster_name) + if last_cluster_name not in first_cluster.get("called_from", ["Error"]): + first_cluster["called_from"] = last_cluster_name + + +def complete_clusters(config): + gw_config = config["general"]["workflow"] + + complete_job_cluster_assignments(gw_config) + complete_resource_information(gw_config) + + return config + + +def complete_job_cluster_assignments(gw_config): + """ + Assigns jobs to their respective job clusters in the workflow configuration. + + Parameters + ---------- + gw_config : dict + The general workflow configuration dictionary containing jobs and job clusters. + + Notes + ----- + This function iterates over all jobs in the workflow configuration and assigns each job + to its specified job cluster. If the job cluster does not already exist in the configuration, + it is created. Each job is then appended to the list of jobs within its respective job cluster. + """ + for job in gw_config["jobs"]: + job_cluster = gw_config["jobs"][job]["job_cluster"] + if job_cluster not in gw_config["clusters"]: + gw_config["clusters"][job_cluster] = {} + + if "jobs" not in gw_config["clusters"][job_cluster]: + gw_config["clusters"][job_cluster]["jobs"] = [] + + gw_config["clusters"][job_cluster]["jobs"].append(job) + + +def complete_resource_information(gw_config): + """ + Completes the resource information for each job cluster in the workflow configuration. + + Parameters + ---------- + gw_config : dict + The general workflow configuration dictionary containing jobs and job clusters. + + Notes + ----- + This function iterates over all job clusters in the workflow configuration and performs the following tasks: + - Merges individual configuration entries from jobs into their respective job clusters. + - Determines whether each job cluster should be submitted to a batch system or run as a shell script. + - Ensures that necessary information such as target queue and order in cluster is present. + - Calculates the total number of processors required for each job cluster based on the order in cluster. + - Sets default values for missing configuration entries. + """ + for job_cluster in gw_config["clusters"]: + clusterconf = gw_config["clusters"][job_cluster] + nproc_sum, nproc_max = calculate_nproc(clusterconf, gw_config) + set_default_job_values(clusterconf) + clusterconf["nproc"] = ( + nproc_sum if clusterconf["order_in_cluster"] == "concurrent" else nproc_max + ) + + +def calculate_nproc(clusterconf, gw_config): + """ + Calculates the total and maximum number of processors required for a job cluster. + + Parameters + ---------- + clusterconf : dict + The configuration dictionary for a specific job cluster. + gw_config : dict + The general workflow configuration dictionary containing jobs and job clusters. + + Returns + ------- + nproc_sum : int + The sum of processors required for all jobs in the cluster. + nproc_max : int + The maximum number of processors required for any single job in the cluster. + """ + nproc_sum = nproc_max = 0 + for job in clusterconf["jobs"]: + jobconf = gw_config["jobs"][job] + merge_job_entries(clusterconf, jobconf) + nproc_sum += jobconf.get("nproc", 1) + nproc_max = max(jobconf.get("nproc", 1), nproc_max) + return nproc_sum, nproc_max + + +def merge_job_entries(clusterconf, jobconf): + """ + Merges individual configuration entries from a job into its respective job cluster. + + Parameters + ---------- + clusterconf : dict + The configuration dictionary for a specific job cluster. + jobconf : dict + The configuration dictionary for a specific job. + """ + keys_to_merge = [ + "submit_to_batch_system", + "order_in_cluster", + "run_on_queue", + "run_after", + "run_before", + "run_only", + "skip_run_number", + "skip_chunk_number", + ] + for key in keys_to_merge: + clusterconf = merge_single_entry_if_possible(key, jobconf, clusterconf) + + if jobconf.get("submit_to_batch_system", False): + clusterconf["submission_type"] = "batch" + elif jobconf.get("script", False): + clusterconf["submission_type"] = "shell" + + +def set_default_job_values(clusterconf): + """ + Sets default values for missing configuration entries in a job cluster. + + Parameters + ---------- + clusterconf : dict + The configuration dictionary for a specific job cluster. + """ + if "submit_to_batch_system" not in clusterconf: + clusterconf["submit_to_batch_system"] = False + else: + if "run_on_queue" not in clusterconf: + raise WorkflowUnknownQueueError( + f"Information on target queue is missing in cluster {clusterconf}." + ) + + if not clusterconf.get("submission_type", False): + clusterconf["submission_type"] = "sim_object" + + if "order_in_cluster" not in clusterconf: + clusterconf["order_in_cluster"] = "sequential" + + +def merge_single_entry_if_possible(entry, sourceconf, targetconf): + if entry in sourceconf: + if entry in targetconf and not sourceconf[entry] == targetconf[entry]: + raise WorkflowMismatchError( + f"Mismatch found in {entry} for cluster {targetconf}" + ) + targetconf[entry] = sourceconf[entry] + return targetconf + + +def calculate_tasks(config): + """ + Calculate the total number of tasks based on the configuration. + + Parameters + ---------- + config : dict + The configuration dictionary containing model information. + + Returns + ------- + int + The total number of tasks calculated from the model configurations. + """ + tasks = 0 + for model in config["general"]["valid_model_names"]: + if "nproc" in config[model]: + tasks += config[model]["nproc"] + elif "nproca" in config[model] and "nprocb" in config[model]: + tasks += config[model]["nproca"] * config[model]["nprocb"] + if "nprocar" in config[model] and "nprocbr" in config[model]: + if ( + config[model]["nprocar"] != "remove_from_namelist" + and config[model]["nprocbr"] != "remove_from_namelist" + ): + tasks += config[model]["nprocar"] * config[model]["nprocbr"] + return tasks + + +def create_prepcompute_stage(): + """ + Create the prepcompute stage configuration for the workflow manager. + + Returns + ------- + dict + The configuration dictionary for the prepcompute stage. + """ + return { + "prepcompute": { + "nproc": 1, + "run_before": "compute", + } + } + + +def create_compute_stage(tasks, config): + """ + Create the compute stage configuration for the workflow manager + + Parameters + ---------- + tasks : int + The total number of tasks to be used in the compute stage. + config : dict + The configuration dictionary containing general settings. + + Returns + ------- + dict + The configuration dictionary for the compute stage. + """ + return { + "compute": { + "nproc": tasks, + "run_before": "tidy", + "submit_to_batch_system": config["general"].get( + "submit_to_batch_system", True + ), + "run_on_queue": config["computer"]["partitions"]["compute"]["name"], + } + } + + +def create_tidy_stage(): + """ + Create the tidy stage configuration for the workflow manager + + Returns + ------- + dict + The configuration dictionary for the tidy stage. + """ + return { + "tidy": { + "nproc": 1, + "run_after": "compute", + } + } + + +def init_total_workflow(config): + """ + Initialize and configure the total workflow based on the given configuration. + + Parameters + ---------- + config : dict + The configuration dictionary containing workflow settings. + + Returns + ------- + dict + The updated configuration dictionary with the initialized workflow. + + Example + ------- + >>> config = { + ... "general": { + ... "workflow": { + ... "clusters": {}, + ... "jobs": {} + ... } + ... }, + ... } + >>> updated_config = init_total_workflow(config) + >>> print(updated_config) + { + "general": { + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {...}, + "compute": {...}, + "tidy": {...} + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy" + } + }, + } + """ + tasks = calculate_tasks(config) + + prepcompute = create_prepcompute_stage() + compute = create_compute_stage(tasks, config) + tidy = create_tidy_stage() + + workflow = config["general"].setdefault("workflow", {}) + workflow.setdefault("clusters", {}) + jobs = workflow.setdefault("jobs", {}) + + jobs.update(prepcompute) + jobs.update(compute) + jobs.update(tidy) + + workflow.setdefault("last_task_in_queue", "tidy") + workflow.setdefault("first_task_in_queue", "prepcompute") + workflow.setdefault("next_run_triggered_by", "tidy") + + return config + + +def merge_jobs(w_config, gw_config, model): + """ + Merge jobs from model-specific workflow configuration into the general workflow configuration. + + Parameters + ---------- + w_config : dict + The model-specific workflow configuration. + gw_config : dict + The general workflow configuration. + model : str + The name of the model. + """ + logger.critical(f"{model=}") + if "jobs" in w_config: + for job in list(copy.deepcopy(w_config["jobs"])): + logger.critical(job) + gw_config["jobs"][job + "_" + model] = copy.deepcopy(w_config["jobs"][job]) + if job in gw_config["jobs"]: + del gw_config["jobs"][job] + update_run_references(gw_config, job, model) + assign_job_cluster(gw_config, job, model) + + +def update_run_references(gw_config, job, model): + """ + Update run_after and run_before references to be model-specific. + + Parameters + ---------- + gw_config : dict + The general workflow configuration. + job : str + The name of the job. + model : str + The name of the model. + """ + for other_job in gw_config["jobs"]: + if "run_after" in gw_config["jobs"][other_job]: + if gw_config["jobs"][other_job]["run_after"] == job: + logger.critical("Updating run_after 001") + logger.critical("Old value: ") + logger.critical(gw_config["jobs"][other_job]["run_after"]) + gw_config["jobs"][other_job]["run_after"] = f"{job}_{model}" + logger.critical( + f"gw_config['jobs']['{other_job}']['run_after'] = {job}_{model}" + ) + if "run_before" in gw_config["jobs"][other_job]: + if gw_config["jobs"][other_job]["run_before"] == job: + logger.critical("Updating run_before 001") + logger.critical("Old value: ") + logger.critical(gw_config["jobs"][other_job]["run_before"]) + gw_config["jobs"][other_job]["run_before"] = f"{job}_{model}" + logger.critical( + f"gw_config['jobs']['{other_job}']['run_before'] = {job}_{model}" + ) + + +def assign_job_cluster(gw_config, job, model): + """ + Assign each job to a job cluster if not already assigned. + + Parameters + ---------- + gw_config : dict + The general workflow configuration. + job : str + The name of the job. + model : str + The name of the model. + """ + if "job_cluster" not in gw_config["jobs"][f"{job}_{model}"]: + gw_config["jobs"][f"{job}_{model}"]["job_cluster"] = job + + +def handle_next_run_triggered_by_and_last_task_in_queue(gw_config): + """ + Handle the next_run_triggered_by key in the workflow configuration. + + Parameters + ---------- + gw_config : dict + The general workflow configuration. + """ + if "next_run_triggered_by" in gw_config: + gw_config["last_task_in_queue"] = gw_config["next_run_triggered_by"] + + +def collect_all_workflow_information(config): + """ + Aggregates workflow configurations from all models into the general workflow config, + handling job renaming and reference updates. + """ + for model_name in config: + if "workflow" not in config[model_name]: + continue + + model_wf = config[model_name]["workflow"] + general_wf = config["general"]["workflow"] + + # Merge clusters first as jobs might depend on them + merge_clusters(model_wf, general_wf) + process_model_jobs(model_wf, general_wf, model_name) + handle_next_run_trigger(model_wf, general_wf) + + return config + + +def merge_clusters(source_wf, target_wf): + """Merge job clusters from model workflow into general workflow""" + for cluster_name, cluster_config in source_wf.get("clusters", {}).items(): + if cluster_name in target_wf["clusters"]: + target_wf["clusters"][cluster_name] = safe_merge( + cluster_config, target_wf["clusters"][cluster_name] + ) + else: + target_wf["clusters"][cluster_name] = copy.deepcopy(cluster_config) + + +def process_model_jobs(source_wf, target_wf, model_name): + """Process and merge jobs with model-specific naming and references""" + if "jobs" not in source_wf: + return + + rename_map = create_rename_mapping(source_wf["jobs"], model_name) + create_model_specific_jobs(source_wf, target_wf, model_name, rename_map) + update_workflow_references(target_wf, rename_map) + resolve_references_to_clusters(target_wf) + + +def resolve_references_to_clusters(workflow_config): + """Convert job references in dependencies to their parent clusters""" + job_to_cluster = { + job: conf["job_cluster"] for job, conf in workflow_config["jobs"].items() + } + + # Update references in ALL jobs + for job_conf in workflow_config["jobs"].values(): + for ref_type in ["run_after", "run_before"]: + if ref_type in job_conf: + job_conf[ref_type] = job_to_cluster.get( + job_conf[ref_type], job_conf[ref_type] + ) + + # Update references in CLUSTERS + for cluster_conf in workflow_config["clusters"].values(): + for ref_type in ["run_after", "run_before", "next_submit"]: + if ref_type in cluster_conf: + if isinstance(cluster_conf[ref_type], list): + cluster_conf[ref_type] = [ + job_to_cluster.get(name, name) + for name in cluster_conf[ref_type] + ] + else: + cluster_conf[ref_type] = job_to_cluster.get( + cluster_conf[ref_type], cluster_conf[ref_type] + ) + + +def create_rename_mapping(jobs, model_name): + """Create mapping from original job names to model-specific names""" + return {orig: f"{orig}_{model_name}" for orig in jobs} + + +def create_model_specific_jobs(source_wf, target_wf, model_name, rename_map): + """Create renamed job entries in general workflow""" + for orig_name, new_name in rename_map.items(): + target_wf["jobs"][new_name] = copy.deepcopy(source_wf["jobs"][orig_name]) + + # Remove original entry if present in general workflow + if orig_name in target_wf["jobs"]: + del target_wf["jobs"][orig_name] + + # Ensure cluster assignment + if "job_cluster" not in target_wf["jobs"][new_name]: + target_wf["jobs"][new_name]["job_cluster"] = orig_name + + +def update_workflow_references(target_wf, rename_map): + """Update references throughout workflow to use renamed jobs""" + # Update references in all jobs + for job_config in target_wf["jobs"].values(): + update_references_in_config(job_config, rename_map) + + # Update references in clusters + for cluster_config in target_wf["clusters"].values(): + update_references_in_config(cluster_config, rename_map) + + +def update_references_in_config(config, rename_map): + """Update references in a single configuration block""" + for ref_type in ["run_after", "run_before", "called_from"]: + if ref_type in config: + config[ref_type] = rename_map.get(config[ref_type], config[ref_type]) + + +def handle_next_run_trigger(source_wf, target_wf): + """Handle next_run_triggered_by inheritance with validation""" + if "next_run_triggered_by" in source_wf: + new_trigger = source_wf["next_run_triggered_by"] + current_trigger = target_wf.get("next_run_triggered_by", "tidy") + + if new_trigger != current_trigger and current_trigger != "tidy": + raise WorkflowMergeError( + f"Conflicting next_run_triggered_by: {current_trigger} vs {new_trigger}" + ) + + target_wf["next_run_triggered_by"] = new_trigger + + +def safe_merge(source, target): + """Safely merge two configurations with conflict checking""" + merged = copy.deepcopy(target) + for key, value in source.items(): + if key in merged and merged[key] != value: + raise WorkflowMergeError( + f"Conflict in key '{key}': {merged[key]} vs {value}" + ) + merged[key] = copy.deepcopy(value) + return merged + + +def merge_if_possible(source, target): + """ + Merge source dictionary into target dictionary, ensuring no conflicts. + + Parameters + ---------- + source : dict + The source dictionary to merge from. + target : dict + The target dictionary to merge into. + + Returns + ------- + dict + The updated target dictionary with merged entries from the source. + """ + for key, value in source.items(): + if key in target and target[key] != value: + raise WorkflowMismatchError( + f"Mismatch while trying to merge clusters {source} into {target}" + ) + target[key] = value + return target + + +class WorkflowError(Exception): + """Base exception for workflow configuration errors""" + + +class WorkflowMergeError(WorkflowError): + """Exception for workflow configuration merge conflicts""" + + +class WorkflowMismatchError(WorkflowError): + """Exception for workflow configuration mismatch errors""" + + +class WorkflowUnknownQueueError(WorkflowError): + """Exception for unknown target queue in workflow configuration""" + + +class WorkflowValidateError(WorkflowError): + """Exception for workflow configuration validation errors""" diff --git a/src/esm_runscripts/workflow-scratch.py b/src/esm_runscripts/workflow-scratch.py new file mode 100644 index 000000000..2d315e070 --- /dev/null +++ b/src/esm_runscripts/workflow-scratch.py @@ -0,0 +1,26 @@ +import copy +import os +import sys + +from loguru import logger + +import esm_parser + + +def assemble_workflow(config): + # + config = init_total_workflow(config) + config = collect_all_workflow_information(config) + config = complete_clusters(config) + config = order_clusters(config) + config = prepend_newrun_job(config) + + if config["general"]["jobtype"] == "unknown": + config["general"]["command_line_config"]["jobtype"] = config["general"][ + "workflow" + ]["first_task_in_queue"] + config["general"]["jobtype"] = config["general"]["workflow"][ + "first_task_in_queue" + ] + + return config diff --git a/src/esm_runscripts/workflow.py b/src/esm_runscripts/workflow.py index 4931367d4..8840b8f80 100644 --- a/src/esm_runscripts/workflow.py +++ b/src/esm_runscripts/workflow.py @@ -1,774 +1,222 @@ -import copy -import sys - +""" +The ESM-Tools Workflow Manager + +Terminology +----------- +job: An encapsulated esm-tools workflow such as tidy, prepexp, prepcompute, + designated by a Prefect Flow (using the @flow decorator) +cluster: A collection of jobs that are grouped together (flow of flows) --> all + written down in 1 HPC Job Scheduler script, better name soon... +task: A single unit of work within a job +""" + +from enum import Enum + +import matplotlib.pyplot as plt +import networkx as nx +import randomname from loguru import logger - -import esm_parser - - -def should_skip_cluster(cluster, config): - """ - Determine whether a specific cluster should be skipped based on the provided configuration. - - Parameters - ---------- - cluster : str - The name of the cluster to check. - config : dict - A dictionary containing various configuration settings. - - Returns - ------- - bool - True if the cluster should be skipped, False otherwise. - - Notes - ----- - The function evaluates several conditions to decide if the cluster should be skipped: - 1. If `run_only` in the cluster configuration is set to "last_run_in_chunk" and `last_run_in_chunk` in the general configuration is not True. - 2. If `run_only` in the cluster configuration is set to "first_run_in_chunk" and `first_run_in_chunk` in the general configuration is not True. - 3. If `skip_chunk_number` in the cluster configuration matches `chunk_number` in the general configuration. - 4. If `skip_run_number` in the cluster configuration matches `run_number` in the general configuration. - - If none of these conditions are met, the function returns False, indicating that the cluster should not be skipped. - """ - general_config = config["general"] - workflow_config = general_config["workflow"] - cluster_config = workflow_config["subjob_clusters"][cluster] - - run_only_on = cluster_config.get("run_only") - - if run_only_on == "last_run_in_chunk" and not general_config.get( - "last_run_in_chunk", False - ): - return True - - if run_only_on == "first_run_in_chunk" and not general_config.get( - "first_run_in_chunk", False - ): - return True - - skip_chunk_number = cluster_config.get("skip_chunk_number") - if skip_chunk_number == general_config.get("chunk_number"): - return True - - skip_run_number = cluster_config.get("skip_run_number") - if skip_run_number == general_config.get("run_number"): - return True - - return False - - -def assemble_workflow(config): - config = init_total_workflow(config) - config = collect_all_workflow_information(config) - config = complete_clusters(config) - config = order_clusters(config) - config = prepend_newrun_job(config) - config = handle_unknown_jobtype(config) - return config +from prefect import flow +from rich.columns import Columns +from rich.console import Console, ConsoleOptions, RenderResult +from rich.panel import Panel +from rich.text import Text -def handle_unknown_jobtype(config): - """ - Update the jobtype in the configuration if it is set to 'unknown'. +class SubmissionType(Enum): + """Enum for the different submission types""" - Args: - config (dict): The configuration dictionary. + BATCH = "batch" + SHELL = "shell" + SIM_OBJECT = "sim_object" - Returns: - dict: The updated configuration dictionary. - """ - if config["general"]["jobtype"] == "unknown": - first_task = config["general"]["workflow"]["first_task_in_queue"] - config["general"]["command_line_config"]["jobtype"] = first_task - config["general"]["jobtype"] = first_task - return config - - -def display_nicely(config): - esm_parser.pprint_config(config["general"]["workflow"]) - return config - - -def prepend_newrun_job(config): - gw_config = config["general"]["workflow"] - first_cluster_name = gw_config["first_task_in_queue"] - first_cluster = gw_config["subjob_clusters"][first_cluster_name] - - if not first_cluster.get("batch_or_shell", "Error") == "Simulation": - last_cluster_name = gw_config["last_task_in_queue"] - last_cluster = gw_config["subjob_clusters"][last_cluster_name] - - new_first_cluster_name = "newrun" - new_first_cluster = { - "newrun": { - "called_from": last_cluster_name, - "run_before": first_cluster_name, - "next_submit": [first_cluster_name], - "subjobs": ["newrun_general"], - "batch_or_shell": "Simulation", - } + def __rich__(self) -> str: + colors = { + SubmissionType.BATCH: "blue", + SubmissionType.SHELL: "green", + SubmissionType.SIM_OBJECT: "magenta", } - - last_cluster["next_submit"].append("newrun") - last_cluster["next_submit"].remove(first_cluster_name) - - first_cluster["called_from"] = "newrun" - - gw_config["first_task_in_queue"] = "newrun" - - new_subjob = { - "newrun_general": { - "nproc": 1, - "called_from": last_cluster_name, - "run_before": first_cluster_name, - "next_submit": [first_cluster_name], - "subjob_cluster": "newrun", - } + return Text(self.value, style=colors[self]) + + +class Workflow: + """A cyclable collection of clusters""" + + +class Cluster: + """A collection of jobs""" + + def __init__(self, jobs=None, name=None): + self.name = name or randomname.get_name() + self.jobs = jobs or [] + + self._call_order = [] + self.G = nx.DiGraph() + self.order_jobs() + + def order_jobs(self): + """Order jobs using topological sorting and detect execution levels.""" + self.G.clear() + job_lookup = {job.name: job for job in self.jobs} + + # Add nodes + for job in self.jobs: + self.G.add_node(job.name, submission_type=job.submission_type) + + # Add edges based on dependencies + for job in self.jobs: + if job.run_after: + for dep in job.run_after: + if dep not in job_lookup: + raise ValueError(f"Dependency '{dep}' not found in jobs!") + self.G.add_edge(job_lookup[dep].name, job.name) + + if job.run_before: + for dep in job.run_before: + if dep not in job_lookup: + raise ValueError(f"Dependency '{dep}' not found in jobs!") + self.G.add_edge(job.name, job_lookup[dep].name) + + # Detect cycles + if not nx.is_directed_acyclic_graph(self.G): + raise ValueError("Circular dependency detected in job ordering!") + + # Perform topological sort + sorted_job_names = list(nx.topological_sort(self.G)) + self._call_order = [job_lookup[name] for name in sorted_job_names] + + # Compute job levels (execution layers) + levels = {job_name: 0 for job_name in sorted_job_names} + for job in sorted_job_names: + for dep in self.G.predecessors(job): + levels[job] = max(levels[job], levels[dep] + 1) + + # Store levels as node attributes + nx.set_node_attributes(self.G, levels, "subset") + + # Create a dictionary of levels and their jobs + self._call_order_by_level = {} + for job_name, level in levels.items(): + if level not in self._call_order_by_level: + self._call_order_by_level[level] = [] + self._call_order_by_level[level].append(job_lookup[job_name]) + + logger.warning(f"Job execution order: {sorted_job_names}") + + def draw_graph(self): + """Draw the job dependency graph with execution levels and colored nodes.""" + plt.figure(figsize=(10, 6)) + + # Define colors for different SubmissionTypes + type_colors = { + SubmissionType.BATCH: "blue", + SubmissionType.SHELL: "green", + SubmissionType.SIM_OBJECT: "magenta", } - - gw_config["subjob_clusters"].update(new_first_cluster) - gw_config["subjobs"].update(new_subjob) - - return config - - # - - -def order_clusters(config): - gw_config = config["general"]["workflow"] - - initialize_next_submit(gw_config) - validate_and_set_dependencies(gw_config) - handle_next_run_triggered_by_and_last_task_in_queue(gw_config) - ensure_first_and_last_clusters_linked(gw_config) - - return config - - -def initialize_next_submit(gw_config): - for subjob_cluster in gw_config["subjob_clusters"]: - if "next_submit" not in gw_config["subjob_clusters"][subjob_cluster]: - gw_config["subjob_clusters"][subjob_cluster]["next_submit"] = [] - - -def validate_and_set_dependencies(gw_config): - for subjob_cluster in gw_config["subjob_clusters"]: - cluster_config = gw_config["subjob_clusters"][subjob_cluster] - if "run_after" not in cluster_config and "run_before" not in cluster_config: - log_and_exit( - f"Don't know when to execute cluster {subjob_cluster}.", gw_config - ) - - if "run_after" in cluster_config: - validate_run_after(cluster_config, subjob_cluster, gw_config) - if "run_before" in cluster_config: - validate_run_before(cluster_config, subjob_cluster, gw_config) - - -def validate_run_after(cluster_config, subjob_cluster, gw_config): - if "run_before" in cluster_config: - log_and_exit( - f"Specifying both run_after and run_before for cluster {subjob_cluster} may lead to problems. Please choose.", - gw_config, - ) - - calling_cluster = cluster_config["run_after"] - if calling_cluster not in gw_config["subjob_clusters"]: - log_and_exit( - f"Validate run after -- Unknown cluster {calling_cluster}.", gw_config + node_colors = [ + type_colors[self.G.nodes[node]["submission_type"]] for node in self.G.nodes + ] + + # Position nodes based on execution level stored in "subset" + pos = nx.multipartite_layout(self.G, subset_key="subset") + + # Draw the graph + nx.draw( + self.G, + pos, + with_labels=True, + node_color=node_colors, + edge_color="gray", + arrows=True, + node_size=2000, + font_size=10, ) - append_to_next_submit(gw_config, calling_cluster, subjob_cluster) - cluster_config["called_from"] = calling_cluster - - if calling_cluster == gw_config["last_task_in_queue"]: - gw_config["last_task_in_queue"] = subjob_cluster - - -def validate_run_before(cluster_config, subjob_cluster, gw_config): - called_cluster = cluster_config["run_before"] - if called_cluster not in gw_config["subjob_clusters"]: - log_and_exit( - f"Validate run before -- Unknown cluster {called_cluster}.", gw_config + plt.title(f"Dependency Graph for {self.name}") + plt.show() + + def add_job(self, job): + self.jobs.append(job) + self.order_jobs() + + def __rich_console__( + self, console: Console, options: ConsoleOptions + ) -> RenderResult: + job_panels = [] + for job in self.jobs: + job_panels.append(job) + job_columns = Columns(job_panels) + panel = Panel( + job_columns, + title="Cluster", + title_align="left", + subtitle=self.name, + border_style="blue", ) - - append_to_next_submit(gw_config, subjob_cluster, called_cluster) - gw_config["subjob_clusters"][called_cluster]["called_from"] = subjob_cluster - - if called_cluster == gw_config["first_task_in_queue"]: - gw_config["first_task_in_queue"] = subjob_cluster - - -def append_to_next_submit(gw_config, source_cluster, target_cluster): - if ( - target_cluster - not in gw_config["subjob_clusters"][source_cluster]["next_submit"] + yield panel + + +class Job: + """One phase of esm-tools""" + + def __init__( + self, + name=None, + steps=None, + submission_type=None, + nproc=None, + run_after=None, + run_before=None, + script=None, + order_in_cluster="sequential", ): - gw_config["subjob_clusters"][source_cluster]["next_submit"].append( - target_cluster - ) - - -def ensure_first_and_last_clusters_linked(gw_config): - first_cluster_name = gw_config["first_task_in_queue"] - first_cluster = gw_config["subjob_clusters"][first_cluster_name] - last_cluster_name = gw_config["last_task_in_queue"] - last_cluster = gw_config["subjob_clusters"][last_cluster_name] - - if first_cluster_name not in last_cluster.get("next_submit", ["Error"]): - last_cluster["next_submit"].append(first_cluster_name) - if last_cluster_name not in first_cluster.get("called_from", ["Error"]): - first_cluster["called_from"] = last_cluster_name - - -def log_and_exit(message, gw_config): - logger.error(message) - esm_parser.pprint_config(gw_config) - sys.exit(-1) - - -# ----------------------------------------------------------------------------- -def complete_clusters(config): - gw_config = config["general"]["workflow"] - - complete_subjob_cluster_assignments(gw_config) - complete_resource_information(gw_config) - - return config - - -def complete_subjob_cluster_assignments(gw_config): - """ - Assigns subjobs to their respective subjob clusters in the workflow configuration. - - Parameters - ---------- - gw_config : dict - The general workflow configuration dictionary containing subjobs and subjob clusters. - - Notes - ----- - This function iterates over all subjobs in the workflow configuration and assigns each subjob - to its specified subjob cluster. If the subjob cluster does not already exist in the configuration, - it is created. Each subjob is then appended to the list of subjobs within its respective subjob cluster. - """ - for subjob in gw_config["subjobs"]: - subjob_cluster = gw_config["subjobs"][subjob]["subjob_cluster"] - if subjob_cluster not in gw_config["subjob_clusters"]: - gw_config["subjob_clusters"][subjob_cluster] = {} - - if "subjobs" not in gw_config["subjob_clusters"][subjob_cluster]: - gw_config["subjob_clusters"][subjob_cluster]["subjobs"] = [] - - gw_config["subjob_clusters"][subjob_cluster]["subjobs"].append(subjob) - - -def complete_resource_information(gw_config): - """ - Completes the resource information for each subjob cluster in the workflow configuration. - - Parameters - ---------- - gw_config : dict - The general workflow configuration dictionary containing subjobs and subjob clusters. - - Notes - ----- - This function iterates over all subjob clusters in the workflow configuration and performs the following tasks: - - Merges individual configuration entries from subjobs into their respective subjob clusters. - - Determines whether each subjob cluster should be submitted to a batch system or run as a shell script. - - Ensures that necessary information such as target queue and order in cluster is present. - - Calculates the total number of processors required for each subjob cluster based on the order in cluster. - - Sets default values for missing configuration entries. - """ - for subjob_cluster in gw_config["subjob_clusters"]: - clusterconf = gw_config["subjob_clusters"][subjob_cluster] - nproc_sum, nproc_max = calculate_nproc(clusterconf, gw_config) - set_default_subjob_values(clusterconf) - clusterconf["nproc"] = ( - nproc_sum if clusterconf["order_in_cluster"] == "concurrent" else nproc_max - ) - - -def calculate_nproc(clusterconf, gw_config): - """ - Calculates the total and maximum number of processors required for a subjob cluster. - - Parameters - ---------- - clusterconf : dict - The configuration dictionary for a specific subjob cluster. - gw_config : dict - The general workflow configuration dictionary containing subjobs and subjob clusters. - - Returns - ------- - nproc_sum : int - The sum of processors required for all subjobs in the cluster. - nproc_max : int - The maximum number of processors required for any single subjob in the cluster. - """ - nproc_sum = nproc_max = 0 - for subjob in clusterconf["subjobs"]: - subjobconf = gw_config["subjobs"][subjob] - merge_subjob_entries(clusterconf, subjobconf) - nproc_sum += subjobconf.get("nproc", 1) - nproc_max = max(subjobconf.get("nproc", 1), nproc_max) - return nproc_sum, nproc_max - - -def merge_subjob_entries(clusterconf, subjobconf): - """ - Merges individual configuration entries from a subjob into its respective subjob cluster. - - Parameters - ---------- - clusterconf : dict - The configuration dictionary for a specific subjob cluster. - subjobconf : dict - The configuration dictionary for a specific subjob. - """ - keys_to_merge = [ - "submit_to_batch_system", - "order_in_cluster", - "run_on_queue", - "run_after", - "run_before", - "run_only", - "skip_run_number", - "skip_chunk_number", - ] - for key in keys_to_merge: - clusterconf = merge_single_entry_if_possible(key, subjobconf, clusterconf) - - if subjobconf.get("submit_to_batch_system", False): - clusterconf["batch_or_shell"] = "batch" - elif subjobconf.get("script", False): - clusterconf["batch_or_shell"] = "shell" - - -def set_default_subjob_values(clusterconf): - """ - Sets default values for missing configuration entries in a subjob cluster. - - Parameters - ---------- - clusterconf : dict - The configuration dictionary for a specific subjob cluster. - """ - if "submit_to_batch_system" not in clusterconf: - clusterconf["submit_to_batch_system"] = False - else: - if "run_on_queue" not in clusterconf: - logger.error( - f"Information on target queue is missing in cluster {clusterconf}." - ) - sys.exit(-1) - - if not clusterconf.get("batch_or_shell", False): - clusterconf["batch_or_shell"] = "Simulation" - - if "order_in_cluster" not in clusterconf: - clusterconf["order_in_cluster"] = "sequential" - - -# ----------------------------------------------------------------------------- - - -def merge_single_entry_if_possible(entry, sourceconf, targetconf): - if entry in sourceconf: - if entry in targetconf and not sourceconf[entry] == targetconf[entry]: - logger.error(f"Mismatch found in {entry} for cluster {targetconf}") - sys.exit(-1) - targetconf[entry] = sourceconf[entry] - return targetconf - - -def calculate_tasks(config): - """ - Calculate the total number of tasks based on the configuration. - - Parameters - ---------- - config : dict - The configuration dictionary containing model information. - - Returns - ------- - int - The total number of tasks calculated from the model configurations. - """ - tasks = 0 - for model in config["general"]["valid_model_names"]: - if "nproc" in config[model]: - tasks += config[model]["nproc"] - elif "nproca" in config[model] and "nprocb" in config[model]: - tasks += config[model]["nproca"] * config[model]["nprocb"] - if "nprocar" in config[model] and "nprocbr" in config[model]: - if ( - config[model]["nprocar"] != "remove_from_namelist" - and config[model]["nprocbr"] != "remove_from_namelist" - ): - tasks += config[model]["nprocar"] * config[model]["nprocbr"] - return tasks - - -def create_prepcompute_stage(): - """ - Create the prepcompute stage configuration for the workflow manager. - - Returns - ------- - dict - The configuration dictionary for the prepcompute stage. - """ - return { - "prepcompute": { - "nproc": 1, - "run_before": "compute", - } - } - - -def create_compute_stage(tasks, config): - """ - Create the compute stage configuration for the workflow manager - - Parameters - ---------- - tasks : int - The total number of tasks to be used in the compute stage. - config : dict - The configuration dictionary containing general settings. - - Returns - ------- - dict - The configuration dictionary for the compute stage. - """ - return { - "compute": { - "nproc": tasks, - "run_before": "tidy", - "submit_to_batch_system": config["general"].get( - "submit_to_batch_system", True - ), - "run_on_queue": config["computer"]["partitions"]["compute"]["name"], - } - } - - -def create_tidy_stage(): - """ - Create the tidy stage configuration for the workflow manager - - Returns - ------- - dict - The configuration dictionary for the tidy stage. - """ - return { - "tidy": { - "nproc": 1, - "run_after": "compute", - } - } - - -def init_total_workflow(config): - """ - Initialize and configure the total workflow based on the given configuration. - - Parameters - ---------- - config : dict - The configuration dictionary containing workflow settings. - - Returns - ------- - dict - The updated configuration dictionary with the initialized workflow. - """ - tasks = calculate_tasks(config) - - prepcompute = create_prepcompute_stage() - compute = create_compute_stage(tasks, config) - tidy = create_tidy_stage() - - workflow = config["general"].setdefault("workflow", {}) - workflow.setdefault("subjob_clusters", {}) - subjobs = workflow.setdefault("subjobs", {}) - - subjobs.update(prepcompute) - subjobs.update(compute) - subjobs.update(tidy) - - workflow.setdefault("last_task_in_queue", "tidy") - workflow.setdefault("first_task_in_queue", "prepcompute") - workflow.setdefault("next_run_triggered_by", "tidy") - - return config - - -def merge_subjobs(w_config, gw_config, model): - """ - Merge subjobs from model-specific workflow configuration into the general workflow configuration. - - Parameters - ---------- - w_config : dict - The model-specific workflow configuration. - gw_config : dict - The general workflow configuration. - model : str - The name of the model. - """ - logger.critical(f"{model=}") - if "subjobs" in w_config: - for subjob in list(copy.deepcopy(w_config["subjobs"])): - logger.critical(subjob) - gw_config["subjobs"][subjob + "_" + model] = copy.deepcopy( - w_config["subjobs"][subjob] - ) - if subjob in gw_config["subjobs"]: - del gw_config["subjobs"][subjob] - update_run_references(gw_config, subjob, model) - assign_subjob_cluster(gw_config, subjob, model) - - -def update_run_references(gw_config, subjob, model): - """ - Update run_after and run_before references to be model-specific. - - Parameters - ---------- - gw_config : dict - The general workflow configuration. - subjob : str - The name of the subjob. - model : str - The name of the model. - """ - for other_subjob in gw_config["subjobs"]: - if "run_after" in gw_config["subjobs"][other_subjob]: - if gw_config["subjobs"][other_subjob]["run_after"] == subjob: - logger.critical("Updating run_after 001") - logger.critical("Old value: ") - logger.critical(gw_config["subjobs"][other_subjob]["run_after"]) - gw_config["subjobs"][other_subjob]["run_after"] = f"{subjob}_{model}" - logger.critical( - f"gw_config['subjobs']['{other_subjob}']['run_after'] = {subjob}_{model}" - ) - if "run_before" in gw_config["subjobs"][other_subjob]: - if gw_config["subjobs"][other_subjob]["run_before"] == subjob: - logger.critical("Updating run_before 001") - logger.critical("Old value: ") - logger.critical(gw_config["subjobs"][other_subjob]["run_before"]) - gw_config["subjobs"][other_subjob]["run_before"] = f"{subjob}_{model}" - logger.critical( - f"gw_config['subjobs']['{other_subjob}']['run_before'] = {subjob}_{model}" - ) - - -def assign_subjob_cluster(gw_config, subjob, model): - """ - Assign each subjob to a subjob cluster if not already assigned. - - Parameters - ---------- - gw_config : dict - The general workflow configuration. - subjob : str - The name of the subjob. - model : str - The name of the model. - """ - if "subjob_cluster" not in gw_config["subjobs"][f"{subjob}_{model}"]: - gw_config["subjobs"][f"{subjob}_{model}"]["subjob_cluster"] = subjob - - -def handle_next_run_triggered_by_and_last_task_in_queue(gw_config): - """ - Handle the next_run_triggered_by key in the workflow configuration. - - Parameters - ---------- - gw_config : dict - The general workflow configuration. - """ - if "next_run_triggered_by" in gw_config: - gw_config["last_task_in_queue"] = gw_config["next_run_triggered_by"] - - -# -------------------------------------------------------------------------------- -def collect_all_workflow_information(config): - """ - Aggregates workflow configurations from all models into the general workflow config, - handling subjob renaming and reference updates. - """ - for model_name in config: - if "workflow" not in config[model_name]: - continue - - model_wf = config[model_name]["workflow"] - general_wf = config["general"]["workflow"] - - # Merge clusters first as subjobs might depend on them - merge_subjob_clusters(model_wf, general_wf) - process_model_subjobs(model_wf, general_wf, model_name) - handle_next_run_trigger(model_wf, general_wf) - - return config - - -def merge_subjob_clusters(source_wf, target_wf): - """Merge subjob clusters from model workflow into general workflow""" - for cluster_name, cluster_config in source_wf.get("subjob_clusters", {}).items(): - if cluster_name in target_wf["subjob_clusters"]: - target_wf["subjob_clusters"][cluster_name] = safe_merge( - cluster_config, target_wf["subjob_clusters"][cluster_name] + self.name = name or randomname.get_name() + self.steps = steps or [] + self.submission_type = submission_type or SubmissionType.SIM_OBJECT + self.nproc = nproc or 1 + self.run_after = run_after + self.run_before = run_before + self.script = script + self.order_in_cluster = order_in_cluster + + # Sanity checks: run_after and run_before must be lists + if self.run_after and not isinstance(self.run_after, list): + self.run_after = [self.run_after] + if self.run_before and not isinstance(self.run_before, list): + self.run_before = [self.run_before] + + # Sanity checks: steps must be callable + for step in self.steps: + if not callable(step): + raise ValueError(f"Step '{step}' is not callable!") + + # Sanity check: if submission type is script, script must be provided + if self.submission_type == SubmissionType.SHELL and not self.script: + raise ValueError("Submission type is shell but no script provided!") + + def __call__(self, config): + @flow(name=self.name) + def job_task(config): + for step in self.steps: + config = step(config) + return config + + return job_task(config) + + def __rich__(self) -> RenderResult: + step_list = Text() + for index, step in enumerate(self.steps): + step_list.append( + f"* [{index+1}/{len(self.steps)}] {step.__name__}\n", style="bold" ) - else: - target_wf["subjob_clusters"][cluster_name] = copy.deepcopy(cluster_config) - - -def process_model_subjobs(source_wf, target_wf, model_name): - """Process and merge subjobs with model-specific naming and references""" - if "subjobs" not in source_wf: - return - - rename_map = create_rename_mapping(source_wf["subjobs"], model_name) - create_model_specific_subjobs(source_wf, target_wf, model_name, rename_map) - update_workflow_references(target_wf, rename_map) - resolve_references_to_clusters(target_wf) - - -def resolve_references_to_clusters(workflow_config): - """Convert subjob references in dependencies to their parent clusters""" - subjob_to_cluster = { - subjob: conf["subjob_cluster"] - for subjob, conf in workflow_config["subjobs"].items() - } - - # Update references in ALL SUBJOBS - for subjob_conf in workflow_config["subjobs"].values(): - for ref_type in ["run_after", "run_before"]: - if ref_type in subjob_conf: - subjob_conf[ref_type] = subjob_to_cluster.get( - subjob_conf[ref_type], subjob_conf[ref_type] - ) - - # Update references in CLUSTERS - for cluster_conf in workflow_config["subjob_clusters"].values(): - for ref_type in ["run_after", "run_before", "next_submit"]: - if ref_type in cluster_conf: - if isinstance(cluster_conf[ref_type], list): - cluster_conf[ref_type] = [ - subjob_to_cluster.get(name, name) - for name in cluster_conf[ref_type] - ] - else: - cluster_conf[ref_type] = subjob_to_cluster.get( - cluster_conf[ref_type], cluster_conf[ref_type] - ) - - -def create_rename_mapping(subjobs, model_name): - """Create mapping from original subjob names to model-specific names""" - return {orig: f"{orig}_{model_name}" for orig in subjobs} - - -def create_model_specific_subjobs(source_wf, target_wf, model_name, rename_map): - """Create renamed subjob entries in general workflow""" - for orig_name, new_name in rename_map.items(): - target_wf["subjobs"][new_name] = copy.deepcopy(source_wf["subjobs"][orig_name]) - - # Remove original entry if present in general workflow - if orig_name in target_wf["subjobs"]: - del target_wf["subjobs"][orig_name] - - # Ensure cluster assignment - if "subjob_cluster" not in target_wf["subjobs"][new_name]: - target_wf["subjobs"][new_name]["subjob_cluster"] = orig_name - - -def update_workflow_references(target_wf, rename_map): - """Update references throughout workflow to use renamed subjobs""" - # Update references in all subjobs - for subjob_config in target_wf["subjobs"].values(): - update_references_in_config(subjob_config, rename_map) - - # Update references in clusters - for cluster_config in target_wf["subjob_clusters"].values(): - update_references_in_config(cluster_config, rename_map) - - -def update_references_in_config(config, rename_map): - """Update references in a single configuration block""" - for ref_type in ["run_after", "run_before", "called_from"]: - if ref_type in config: - config[ref_type] = rename_map.get(config[ref_type], config[ref_type]) - - -def handle_next_run_trigger(source_wf, target_wf): - """Handle next_run_triggered_by inheritance with validation""" - if "next_run_triggered_by" in source_wf: - new_trigger = source_wf["next_run_triggered_by"] - current_trigger = target_wf.get("next_run_triggered_by", "tidy") - - if new_trigger != current_trigger and current_trigger != "tidy": - raise WorkflowMergeError( - f"Conflicting next_run_triggered_by: {current_trigger} vs {new_trigger}" - ) - - target_wf["next_run_triggered_by"] = new_trigger - - -class WorkflowMergeError(Exception): - """Exception for workflow configuration merge conflicts""" - - -def safe_merge(source, target): - """Safely merge two configurations with conflict checking""" - merged = copy.deepcopy(target) - for key, value in source.items(): - if key in merged and merged[key] != value: - raise WorkflowMergeError( - f"Conflict in key '{key}': {merged[key]} vs {value}" - ) - merged[key] = copy.deepcopy(value) - return merged - - -# -------------------------------------------------------------------------------- - - -def merge_if_possible(source, target, exit_on_mismatch=True): - """ - Merge source dictionary into target dictionary, ensuring no conflicts. - - Parameters - ---------- - source : dict - The source dictionary to merge from. - target : dict - The target dictionary to merge into. - exit_on_mismatch : bool, optional - Whether to exit if a mismatch is found, by default True - - Returns - ------- - dict - The updated target dictionary with merged entries from the source. - """ - for key, value in source.items(): - if key in target and target[key] != value: - logger.error( - f"Mismatch while trying to merge subjob_clusters {source} into {target}" - ) - if exit_on_mismatch: - sys.exit(1) - target[key] = value - return target + step_list.append("\n\n") + step_list.append("Submission type: ", style="dim") + step_list.append(self.submission_type.__rich__()) + panel = Panel.fit( + step_list, + title="Job", + title_align="left", + subtitle=self.name, + border_style="green", + ) + return panel diff --git a/tests/test_esm_runscripts/test_workflow.py b/tests/test_esm_runscripts/test_workflow.py new file mode 100644 index 000000000..5d0a84775 --- /dev/null +++ b/tests/test_esm_runscripts/test_workflow.py @@ -0,0 +1,642 @@ +import copy + +import pytest + +from esm_runscripts.workflow import (WorkflowError, + collect_all_workflow_information, + complete_clusters, init_total_workflow, + merge_if_possible, + merge_single_entry_if_possible, + order_clusters, prepend_newrun_job, + should_skip_cluster) + + +def test_should_skip_cluster_last_run_in_chunk(): + config = { + "general": { + "workflow": { + "clusters": {"test_cluster": {"run_only": "last_run_in_chunk"}} + }, + "last_run_in_chunk": False, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_first_run_in_chunk(): + config = { + "general": { + "workflow": { + "clusters": {"test_cluster": {"run_only": "first_run_in_chunk"}} + }, + "first_run_in_chunk": False, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_skip_chunk_number(): + config = { + "general": { + "workflow": {"clusters": {"test_cluster": {"skip_chunk_number": 1}}}, + "chunk_number": 1, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_skip_run_number(): + config = { + "general": { + "workflow": {"clusters": {"test_cluster": {"skip_run_number": 1}}}, + "run_number": 1, + } + } + assert should_skip_cluster("test_cluster", config) is True + + +def test_should_skip_cluster_no_skip(): + config = { + "general": { + "workflow": {"clusters": {"test_cluster": {}}}, + "last_run_in_chunk": True, + "first_run_in_chunk": True, + "chunk_number": 2, + "run_number": 2, + } + } + assert should_skip_cluster("test_cluster", config) is False + + +def test_init_total_workflow_minimal_config(): + config = { + "general": {"valid_model_names": [], "workflow": {}}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": [], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 0, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_init_total_workflow_config_with_nproc(): + config = { + "general": {"valid_model_names": ["model1"], "workflow": {}}, + "model1": {"nproc": 4}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": ["model1"], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 4, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "model1": {"nproc": 4}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_init_total_workflow_config_with_nproca_nprocb(): + config = { + "general": {"valid_model_names": ["model1"], "workflow": {}}, + "model1": {"nproca": 2, "nprocb": 3}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": ["model1"], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 6, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "model1": {"nproca": 2, "nprocb": 3}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_init_total_workflow_config_with_nprocar_nprocbr(): + config = { + "general": {"valid_model_names": ["model1"], "workflow": {}}, + "model1": {"nproca": 2, "nprocb": 3, "nprocar": 1, "nprocbr": 2}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + expected = { + "general": { + "valid_model_names": ["model1"], + "workflow": { + "clusters": {}, + "jobs": { + "prepcompute": {"nproc": 1, "run_before": "compute"}, + "compute": { + "nproc": 8, + "run_before": "tidy", + "submit_to_batch_system": True, + "run_on_queue": "default_queue", + }, + "tidy": {"nproc": 1, "run_after": "compute"}, + }, + "last_task_in_queue": "tidy", + "first_task_in_queue": "prepcompute", + "next_run_triggered_by": "tidy", + }, + }, + "model1": {"nproca": 2, "nprocb": 3, "nprocar": 1, "nprocbr": 2}, + "computer": {"partitions": {"compute": {"name": "default_queue"}}}, + } + result = init_total_workflow(config) + assert result == expected + + +def test_prepend_newrun_job(): + config = { + "general": { + "workflow": { + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + "clusters": { + "cluster1": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": [], + }, + "cluster2": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": ["cluster1"], + }, + }, + "jobs": {}, + } + } + } + + expected_config = { + "general": { + "workflow": { + "first_task_in_queue": "newrun", + "last_task_in_queue": "cluster2", + "clusters": { + "cluster1": { + "submission_type": "OtherTask", + "called_from": "newrun", + "next_submit": [], + }, + "cluster2": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": ["newrun"], + }, + "newrun": { + "called_from": "cluster2", + "run_before": "cluster1", + "next_submit": ["cluster1"], + "jobs": ["newrun_general"], + "submission_type": "sim_object", + }, + }, + "jobs": { + "newrun_general": { + "nproc": 1, + "called_from": "cluster2", + "run_before": "cluster1", + "next_submit": ["cluster1"], + "job_cluster": "newrun", + } + }, + } + } + } + + result = prepend_newrun_job(config) + assert result == expected_config + + +def test_prepend_newrun_job_no_change(): + config = { + "general": { + "workflow": { + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + "clusters": { + "cluster1": { + "submission_type": "sim_object", + "called_from": None, + "next_submit": [], + }, + "cluster2": { + "submission_type": "OtherTask", + "called_from": None, + "next_submit": ["cluster1"], + }, + }, + "jobs": {}, + } + } + } + + expected_config = config.copy() + + result = prepend_newrun_job(config) + assert result == expected_config + + +def test_merge_single_entry_if_possible_entry_in_source_not_in_target(): + sourceconf = {"key1": "value1"} + targetconf = {} + entry = "key1" + result = merge_single_entry_if_possible(entry, sourceconf, targetconf) + assert result == {"key1": "value1"} + + +def test_merge_single_entry_if_possible_entry_in_both_matching(): + sourceconf = {"key1": "value1"} + targetconf = {"key1": "value1"} + entry = "key1" + result = merge_single_entry_if_possible(entry, sourceconf, targetconf) + assert result == {"key1": "value1"} + + +def test_merge_single_entry_if_possible_entry_in_both_mismatching(): + sourceconf = {"key1": "value1"} + targetconf = {"key1": "value2"} + entry = "key1" + with pytest.raises(WorkflowError): + merge_single_entry_if_possible(entry, sourceconf, targetconf) + + +def test_merge_if_possible_no_conflict(): + source = {"a": 1, "b": 2} + target = {"c": 3, "d": 4} + expected = {"c": 3, "d": 4, "a": 1, "b": 2} + result = merge_if_possible(source, target) + assert result == expected + + +def test_merge_if_possible_with_conflict(): + source = {"a": 1, "b": 2} + target = {"a": 2, "c": 3} + with pytest.raises(WorkflowError): + merge_if_possible(source, target) + + +def test_merge_if_possible_partial_conflict(): + source = {"a": 1, "b": 2} + target = {"a": 1, "c": 3} + expected = {"a": 1, "c": 3, "b": 2} + result = merge_if_possible(source, target) + assert result == expected + + +@pytest.fixture +def sample_config(): + return { + "model1": { + "workflow": { + "clusters": { + "cluster1": {"key1": "value1"}, + }, + "jobs": { + "jobA": {"run_before": "jobB"}, + "jobB": {"run_after": "jobA", "run_before": "jobC"}, + "jobC": {"run_after": "jobB"}, + }, + "next_run_triggered_by": "trigger1", + } + }, + "general": { + "workflow": { + "clusters": { + "cluster1": {"key1": "general_value1"}, + }, + "jobs": { + "job2": {}, + "job3": {}, + }, + "next_run_triggered_by": "tidy", + } + }, + } + + +def test_collect_all_workflow_information(sample_config): + config = copy.deepcopy(sample_config) + updated_config = collect_all_workflow_information(config) + + # Test clusters merging + assert ( + updated_config["general"]["workflow"]["clusters"]["cluster1"]["key1"] + == "value1" + ) + + # Test jobs renaming and copying + assert "job1_model1" in updated_config["general"]["workflow"]["jobs"] + assert "job1" not in updated_config["general"]["workflow"]["jobs"] + + # Test run_after and run_before renaming + assert ( + updated_config["general"]["workflow"]["jobs"]["job1_model1"]["run_after"] + == "job2" + ) + assert ( + updated_config["general"]["workflow"]["jobs"]["job1_model1"]["run_before"] + == "job3" + ) + + # Test job_cluster assignment + assert ( + updated_config["general"]["workflow"]["jobs"]["job1_model1"]["job_cluster"] + == "job1" + ) + + # Test next_run_triggered_by + assert updated_config["general"]["workflow"]["next_run_triggered_by"] == "trigger1" + + +def test_collect_all_workflow_information_mismatch(sample_config): + config = copy.deepcopy(sample_config) + config["model1"]["workflow"]["next_run_triggered_by"] = "mismatch_trigger" + + with pytest.raises(SystemExit): + collect_all_workflow_information(config) + + +def test_order_clusters_minimal_config(): + config = { + "general": { + "workflow": { + "clusters": {}, + "jobs": {}, + "first_task_in_queue": "first_task", + "last_task_in_queue": "last_task", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": {}, + "jobs": {}, + "first_task_in_queue": "first_task", + "last_task_in_queue": "last_task", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_order_clusters_with_run_after(): + config = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_after": "cluster2"}, + "cluster2": {}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": { + "cluster1": { + "run_after": "cluster2", + "called_from": "cluster2", + "next_submit": [], + }, + "cluster2": {"next_submit": ["cluster1"]}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_order_clusters_with_run_before(): + config = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_before": "cluster2"}, + "cluster2": {}, + }, + "jobs": {}, + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_before": "cluster2", "next_submit": ["cluster2"]}, + "cluster2": {"called_from": "cluster1"}, + }, + "jobs": {}, + "first_task_in_queue": "cluster1", + "last_task_in_queue": "cluster2", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_order_clusters_with_run_after_and_run_before(): + config = { + "general": { + "workflow": { + "clusters": { + "cluster1": {"run_after": "cluster2"}, + "cluster2": {"run_before": "cluster3"}, + "cluster3": {}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + expected = { + "general": { + "workflow": { + "clusters": { + "cluster1": { + "run_after": "cluster2", + "called_from": "cluster2", + "next_submit": [], + }, + "cluster2": { + "run_before": "cluster3", + "next_submit": ["cluster1", "cluster3"], + }, + "cluster3": {"called_from": "cluster2"}, + }, + "jobs": {}, + "first_task_in_queue": "cluster2", + "last_task_in_queue": "cluster1", + } + } + } + result = order_clusters(config) + assert result == expected + + +def test_complete_clusters(): + config = { + "general": { + "workflow": { + "jobs": { + "newexp": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "prepexp": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "prepcompute": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "write_next_cluster": { + "job_cluster": "cluster0", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + "compute": { + "job_cluster": "cluster1", + "nproc": 288, + "submit_to_batch_system": True, + "submission_type": "batch", + }, + "tidy": { + "job_cluster": "cluster1", + "nproc": 1, + "submit_to_batch_system": True, + "submission_type": "sim_object", + }, + "prepcompute": { + "job_cluster": "cluster1", + "nproc": 1, + "submit_to_batch_system": False, + "submission_type": "sim_object", + }, + }, + "clusters": {}, + } + } + } + + expected_config = { + "general": { + "workflow": { + "jobs": { + "job1": { + "job_cluster": "cluster1", + "nproc": 2, + "submit_to_batch_system": True, + "script": True, + }, + "job2": { + "job_cluster": "cluster1", + "nproc": 4, + "submit_to_batch_system": False, + "script": True, + }, + "job3": { + "job_cluster": "cluster2", + "nproc": 1, + "submit_to_batch_system": False, + "script": False, + }, + }, + "clusters": { + "cluster1": { + "jobs": ["job1", "job2"], + "submission_type": "batch", + "submit_to_batch_system": True, + "order_in_cluster": "sequential", + "nproc": 4, + }, + "cluster2": { + "jobs": ["job3"], + "submission_type": "sim_object", + "submit_to_batch_system": False, + "order_in_cluster": "sequential", + "nproc": 1, + }, + }, + } + } + } + + result = complete_clusters(config) + assert result == expected_config