Skip to content

Commit

Permalink
wip: AI-aided refactor of workflow module, part 2
Browse files Browse the repository at this point in the history
  • Loading branch information
pgierz committed Jan 29, 2025
1 parent fffd387 commit 9cd35ae
Showing 1 changed file with 79 additions and 63 deletions.
142 changes: 79 additions & 63 deletions src/esm_runscripts/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,29 @@ def should_skip_cluster(cluster, config):


def assemble_workflow(config):
#
config = init_total_workflow(config)
config = collect_all_workflow_information(config)
config = complete_clusters(config)
config = order_clusters(config)
config = prepend_newrun_job(config)
config = handle_unknown_jobtype(config)
return config


def handle_unknown_jobtype(config):
"""
Update the jobtype in the configuration if it is set to 'unknown'.
Args:
config (dict): The configuration dictionary.
Returns:
dict: The updated configuration dictionary.
"""
if config["general"]["jobtype"] == "unknown":
config["general"]["command_line_config"]["jobtype"] = config["general"][
"workflow"
]["first_task_in_queue"]
config["general"]["jobtype"] = config["general"]["workflow"][
"first_task_in_queue"
]
first_task = config["general"]["workflow"]["first_task_in_queue"]
config["general"]["command_line_config"]["jobtype"] = first_task
config["general"]["jobtype"] = first_task

return config

Expand Down Expand Up @@ -132,77 +141,80 @@ def prepend_newrun_job(config):
def order_clusters(config):
gw_config = config["general"]["workflow"]

initialize_next_submit(gw_config)
validate_and_set_dependencies(gw_config)
handle_next_run_triggered_by(gw_config)
ensure_first_and_last_clusters_linked(gw_config)

return config


def initialize_next_submit(gw_config):
for subjob_cluster in gw_config["subjob_clusters"]:
if "next_submit" not in gw_config["subjob_clusters"][subjob_cluster]:
gw_config["subjob_clusters"][subjob_cluster]["next_submit"] = []


def validate_and_set_dependencies(gw_config):
for subjob_cluster in gw_config["subjob_clusters"]:
if "run_after" not in gw_config["subjob_clusters"][subjob_cluster]:
if not ("run_before" in gw_config["subjob_clusters"][subjob_cluster]):
cluster_config = gw_config["subjob_clusters"][subjob_cluster]
if "run_after" not in cluster_config and "run_before" not in cluster_config:
log_and_exit(
f"Don't know when to execute cluster {subjob_cluster}.", gw_config
)

logger.error(f"Don't know when to execute cluster {subjob_cluster}.")
logger.error(gw_config)
sys.exit(-1)
if "run_after" in cluster_config:
validate_run_after(cluster_config, subjob_cluster, gw_config)
if "run_before" in cluster_config:
validate_run_before(cluster_config, subjob_cluster, gw_config)

if "run_after" in gw_config["subjob_clusters"][subjob_cluster]:
if "run_before" in gw_config["subjob_clusters"][subjob_cluster]:
logger.error(
f"Specifying both run_after and run_before for cluster {subjob_cluster} may lead to problems."
)
logger.error(f"Please choose.")
sys.exit(-1)
if (
not gw_config["subjob_clusters"][subjob_cluster]["run_after"]
in gw_config["subjob_clusters"]
):
logger.error(
f"Unknown cluster {gw_config['subjob_clusters'][subjob_cluster]['run_after']}."
)
sys.exit(-1)

calling_cluster = gw_config["subjob_clusters"][subjob_cluster]["run_after"]
def validate_run_after(cluster_config, subjob_cluster, gw_config):
if "run_before" in cluster_config:
log_and_exit(
f"Specifying both run_after and run_before for cluster {subjob_cluster} may lead to problems. Please choose.",
gw_config,
)

if (
subjob_cluster
not in gw_config["subjob_clusters"][calling_cluster]["next_submit"]
):
gw_config["subjob_clusters"][calling_cluster]["next_submit"].append(
subjob_cluster
)
gw_config["subjob_clusters"][subjob_cluster][
"called_from"
] = calling_cluster

if calling_cluster == gw_config["last_task_in_queue"]:
gw_config["last_task_in_queue"] = subjob_cluster

if "run_before" in gw_config["subjob_clusters"][subjob_cluster]:
if (
not gw_config["subjob_clusters"][subjob_cluster]["run_before"]
in gw_config["subjob_clusters"]
):
logger.error(
f"Unknown cluster {gw_config['subjob_clusters'][subjob_cluster]['run_before']}."
)
sys.exit(-1)
calling_cluster = cluster_config["run_after"]
if calling_cluster not in gw_config["subjob_clusters"]:
log_and_exit(f"Unknown cluster {calling_cluster}.", gw_config)

called_cluster = gw_config["subjob_clusters"][subjob_cluster]["run_before"]
append_to_next_submit(gw_config, calling_cluster, subjob_cluster)
cluster_config["called_from"] = calling_cluster

if (
called_cluster
not in gw_config["subjob_clusters"][subjob_cluster]["next_submit"]
):
gw_config["subjob_clusters"][subjob_cluster]["next_submit"].append(
called_cluster
)
gw_config["subjob_clusters"][called_cluster]["called_from"] = subjob_cluster
if calling_cluster == gw_config["last_task_in_queue"]:
gw_config["last_task_in_queue"] = subjob_cluster


def validate_run_before(cluster_config, subjob_cluster, gw_config):
called_cluster = cluster_config["run_before"]
if called_cluster not in gw_config["subjob_clusters"]:
log_and_exit(f"Unknown cluster {called_cluster}.", gw_config)

append_to_next_submit(gw_config, subjob_cluster, called_cluster)
gw_config["subjob_clusters"][called_cluster]["called_from"] = subjob_cluster

if called_cluster == gw_config["first_task_in_queue"]:
gw_config["first_task_in_queue"] = subjob_cluster

if called_cluster == gw_config["first_task_in_queue"]:
gw_config["first_task_in_queue"] = subjob_cluster

def append_to_next_submit(gw_config, source_cluster, target_cluster):
if (
target_cluster
not in gw_config["subjob_clusters"][source_cluster]["next_submit"]
):
gw_config["subjob_clusters"][source_cluster]["next_submit"].append(
target_cluster
)


def handle_next_run_triggered_by(gw_config):
if "next_run_triggered_by" in gw_config:
gw_config["last_task_in_queue"] = gw_config["next_run_triggered_by"]


def ensure_first_and_last_clusters_linked(gw_config):
first_cluster_name = gw_config["first_task_in_queue"]
first_cluster = gw_config["subjob_clusters"][first_cluster_name]
last_cluster_name = gw_config["last_task_in_queue"]
Expand All @@ -213,7 +225,11 @@ def order_clusters(config):
if last_cluster_name not in first_cluster.get("called_from", ["Error"]):
first_cluster["called_from"] = last_cluster_name

return config

def log_and_exit(message, gw_config):
logger.error(message)
logger.error(gw_config)
sys.exit(-1)


# -----------------------------------------------------------------------------
Expand Down

0 comments on commit 9cd35ae

Please sign in to comment.