Skip to content

Commit

Permalink
Merge pull request #4679 from jedwards4b/refine_hidden_workflow_flag
Browse files Browse the repository at this point in the history
improve functionality of hidden workflow flag
  • Loading branch information
jedwards4b authored Sep 20, 2024
2 parents 0cc20bf + b05eada commit 2776043
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 51 deletions.
86 changes: 35 additions & 51 deletions CIME/XML/env_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@ def __init__(self, case_root=None, infile="env_batch.xml", read_only=False):
initialize an object interface to file env_batch.xml in the case directory
"""
self._batchtype = None
self._hidden_batch_script = {}
# This arbitrary setting should always be overwritten
self._default_walltime = "00:20:00"
schema = os.path.join(utils.get_schema_path(), "env_batch.xsd")
super(EnvBatch, self).__init__(
case_root, infile, schema=schema, read_only=read_only
)
self._batchtype = self.get_batch_system_type()
self._env_workflow = None

# pylint: disable=arguments-differ
def set_value(self, item, value, subgroup=None, ignore_type=False):
Expand Down Expand Up @@ -205,14 +205,16 @@ def set_batch_system(self, batchobj, batch_system_type=None):
lock_file(os.path.basename(batchobj.filename), self._caseroot)

def get_job_overrides(self, job, case):
env_workflow = case.get_env("workflow")
if not self._env_workflow:
self._env_workflow = case.get_env("workflow")
(
total_tasks,
num_nodes,
tasks_per_node,
thread_count,
ngpus_per_node,
) = env_workflow.get_job_specs(case, job)
) = self._env_workflow.get_job_specs(case, job)

overrides = {}

if total_tasks:
Expand Down Expand Up @@ -258,27 +260,12 @@ def make_batch_script(self, input_template, job, case, outfile=None):
subgroup=job,
overrides=overrides,
)
env_workflow = case.get_env("workflow")

hidden = env_workflow.get_value("hidden", subgroup=job)
# case.st_archive is not hidden for backward compatibility
if (
(job != "case.st_archive" and hidden is None)
or hidden == "True"
or hidden == "true"
):
self._hidden_batch_script[job] = True
else:
self._hidden_batch_script[job] = False
if not self._env_workflow:
self._env_workflow = case.get_env("workflow")

output_name = (
get_batch_script_for_job(
job,
hidden=(
self._hidden_batch_script[job]
if job in self._hidden_batch_script
else None
),
job, hidden=self._env_workflow.hidden_job(case, job)
)
if outfile is None
else outfile
Expand All @@ -299,8 +286,10 @@ def set_job_defaults(self, batch_jobs, case):

if self._batchtype == "none":
return
env_workflow = case.get_env("workflow")
known_jobs = env_workflow.get_jobs()

if not self._env_workflow:
self._env_workflow = case.get_env("workflow")
known_jobs = self._env_workflow.get_jobs()

for job, jsect in batch_jobs:
if job not in known_jobs:
Expand Down Expand Up @@ -457,11 +446,13 @@ def set_job_defaults(self, batch_jobs, case):
seconds = convert_to_seconds(walltime)
full_bab_time = convert_to_babylonian_time(seconds)
walltime = format_time(walltime_format, "%H:%M:%S", full_bab_time)
if not self._env_workflow:
self._env_workflow = case.get_env("workflow")

env_workflow.set_value(
self._env_workflow.set_value(
"JOB_QUEUE", self.text(queue), subgroup=job, ignore_type=False
)
env_workflow.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job)
self._env_workflow.set_value("JOB_WALLCLOCK_TIME", walltime, subgroup=job)
logger.debug(
"Job {} queue {} walltime {}".format(job, self.text(queue), walltime)
)
Expand Down Expand Up @@ -764,20 +755,19 @@ def submit_jobs(
waiting to resubmit at the end of the first sequence
workflow is a logical indicating whether only "job" is submitted or the workflow sequence starting with "job" is submitted
"""
env_workflow = case.get_env("workflow")

external_workflow = case.get_value("EXTERNAL_WORKFLOW")
alljobs = env_workflow.get_jobs()
if not self._env_workflow:
self._env_workflow = case.get_env("workflow")
alljobs = self._env_workflow.get_jobs()
alljobs = [
j
for j in alljobs
if os.path.isfile(
os.path.join(
self._caseroot,
get_batch_script_for_job(
j,
hidden=self._hidden_batch_script[j]
if j in self._hidden_batch_script
else None,
j, hidden=self._env_workflow.hidden_job(case, j)
),
)
)
Expand All @@ -796,7 +786,9 @@ def submit_jobs(
if index < startindex:
continue
try:
prereq = env_workflow.get_value("prereq", subgroup=job, resolved=False)
prereq = self._env_workflow.get_value(
"prereq", subgroup=job, resolved=False
)
if (
external_workflow
or prereq is None
Expand All @@ -815,7 +807,9 @@ def submit_jobs(
),
)
if prereq:
jobs.append((job, env_workflow.get_value("dependency", subgroup=job)))
jobs.append(
(job, self._env_workflow.get_value("dependency", subgroup=job))
)

if self._batchtype == "cobalt":
break
Expand Down Expand Up @@ -1100,6 +1094,7 @@ def _submit_single_job(
set_continue_run=resubmit_immediate,
submit_resubmits=workflow and not resubmit_immediate,
)

if batch_system == "lsf" and not batch_env_flag:
sequence = (
run_args,
Expand All @@ -1108,11 +1103,7 @@ def _submit_single_job(
batchredirect,
get_batch_script_for_job(
job,
hidden=(
self._hidden_batch_script[job]
if job in self._hidden_batch_script
else None
),
hidden=self._env_workflow.hidden_job(case, job),
),
)
elif batch_env_flag:
Expand All @@ -1125,11 +1116,7 @@ def _submit_single_job(
self._caseroot,
get_batch_script_for_job(
job,
hidden=(
self._hidden_batch_script[job]
if job in self._hidden_batch_script
else None
),
hidden=self._env_workflow.hidden_job(case, job),
),
),
)
Expand All @@ -1142,11 +1129,7 @@ def _submit_single_job(
self._caseroot,
get_batch_script_for_job(
job,
hidden=(
self._hidden_batch_script[job]
if job in self._hidden_batch_script
else None
),
hidden=self._env_workflow.hidden_job(case, job),
),
),
run_args,
Expand Down Expand Up @@ -1439,12 +1422,13 @@ def compare_xml(self, other):

def make_all_batch_files(self, case):
machdir = case.get_value("MACHDIR")
env_workflow = case.get_env("workflow")
logger.info("Creating batch scripts")
jobs = env_workflow.get_jobs()
if not self._env_workflow:
self._env_workflow = case.get_env("workflow")
jobs = self._env_workflow.get_jobs()
for job in jobs:
template = case.get_resolved_value(
env_workflow.get_value("template", subgroup=job)
self._env_workflow.get_value("template", subgroup=job)
)
if os.path.isabs(template):
input_batch_script = template
Expand Down
12 changes: 12 additions & 0 deletions CIME/XML/env_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from CIME.XML.standard_module_setup import *
from CIME.XML.env_base import EnvBase
from CIME.utils import get_cime_root

import re, math

logger = logging.getLogger(__name__)
Expand All @@ -21,6 +22,7 @@ def __init__(self, case_root=None, infile="env_workflow.xml", read_only=False):
# schema = os.path.join(get_cime_root(), "CIME", "config", "xml_schemas", "env_workflow.xsd")
# TODO: define schema for this file
schema = None
self._hidden = {}
super(EnvWorkflow, self).__init__(
case_root, infile, schema=schema, read_only=read_only
)
Expand Down Expand Up @@ -89,7 +91,17 @@ def get_type_info(self, vid):
)
return type_info

def hidden_job(self, case, job):
if job not in self._hidden:
self.get_job_specs(case, job)
return self._hidden[job]

def get_job_specs(self, case, job):
hidden = self.get_value("hidden", subgroup=job)
self._hidden[job] = (hidden is None and job != "case.st_archive") or (
hidden is not None and hidden.lower() == "true"
)

task_count = case.get_resolved_value(self.get_value("task_count", subgroup=job))
tasks_per_node = case.get_resolved_value(
self.get_value("tasks_per_node", subgroup=job)
Expand Down

0 comments on commit 2776043

Please sign in to comment.