Skip to content

Commit

Permalink
Leave one node for COMPSs/Hecuba so that pruning works
Browse files Browse the repository at this point in the history
  • Loading branch information
kinow committed Feb 27, 2024
1 parent 7af473b commit 16b42db
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 18 deletions.
33 changes: 21 additions & 12 deletions Pillar_II/esm/src/dummy_prune_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,26 +17,32 @@
CHECK_FOR_PRUNING_SLEEP_TIME_SECS = 5
"""How long should we sleep for"""

# TODO: remove me later
MEMBERS_PRUNED = [2]

logger = logging.getLogger(__name__)


@task(expid=IN)
def esm_analysis_prune(expid: str):
@on_failure(management='IGNORE', returns=0)
@task(expid=IN, member=IN, returns=bool, time_out=480)
def esm_analysis_prune(expid: str, member: int) -> bool:
"""This has to be called by PYCOMPSs as analysis.
Args:
expid: The experiment ID.
member: The ensemble member.
Returns:
bool: True if the ensemble member was pruned.
"""
# N.B.: importing this results in a network query to Hecuba servers,
# which fails if the servers are not available.

# TODO: Suvi: call the AI code here
member = int(expid.split('_')[1])
if member != 2:
logging.info("We only prune the ensemble member #2!")
return
if member not in MEMBERS_PRUNED:
logging.info(f"We only prune the ensemble members #{str(MEMBERS_PRUNED)}!")
return False

logging.info("Pruning ensemble member #2!")
logging.info(f"Pruning ensemble member #{str(member)}!")

logging.info("Importing Hecuba")
from hecuba import StorageDict # type: ignore
Expand All @@ -58,25 +64,28 @@ class MetaDictClass(StorageDict):
# RuntimeError: StorageDict: missed specification.
# Type of Primary Key or Column undefined


# This is an infinite-loop, with a sleep time. The execution
# must be wrapped in an existing COMPSs or Slurm job, with a
# walltime or some limit to control the maximum execution
# time, and kill this task.
expid_member = f'{expid}_{str(member)}'
while True:
logging.info(f"MetaDictClass.get_by_alias('{expid}')")
logging.info(f"MetaDictClass.get_by_alias('{expid_member}')")
try:
mdc = MetaDictClass.get_by_alias(expid)
mdc = MetaDictClass.get_by_alias(expid_member)
break
except RuntimeError:
logging.info(f"Simulation {expid} not found sleeping +{CHECK_FOR_PRUNING_SLEEP_TIME_SECS} seconds...")
logging.info(
f"Simulation member {expid_member} not found sleeping +{CHECK_FOR_PRUNING_SLEEP_TIME_SECS} seconds...")
sleep(CHECK_FOR_PRUNING_SLEEP_TIME_SECS)

prune_sec = randint(10, 20)
sleep(prune_sec)
logging.info("Setting prune to TRUE!")
mdc['prune'] = "true"
logging.info(f"Pruned {expid} after {prune_sec} seconds")
logging.info(f"Pruned member [{str(member)}] from experiment [{expid}] after {prune_sec} seconds")

return True


__all__ = ['esm_analysis_prune']
17 changes: 12 additions & 5 deletions Pillar_II/esm/src/esm_simulation.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,15 @@ def _run_esm(*, expid: str, model: str, prune: bool, config_parser: ConfigParser
processes = runtime_config_parser['pycompss']['processes']
processes_per_node = runtime_config_parser['pycompss']['processes_per_node']

# N.B.: Apparently giving all nodes to mpirun/FESOM2 results in resource
# starvation as COMPSs will be trying to run a task to prune as well.
# Thus, we always allocate one node less to leave it for COMPSs and
# Hecuba.
cores_in_one_node = int(processes_per_node)
cores_requested = int(processes)
if cores_requested > cores_in_one_node:
processes = str(cores_requested - cores_in_one_node)

# This is where we delegate the ESM execution to a model's module code
# (e.g. FESOM2, AWICM3, etc.).
ensemble_start_dates = runtime_config_parser['common']['ensemble_start_dates'].split(",")
Expand All @@ -206,16 +215,14 @@ def _run_esm(*, expid: str, model: str, prune: bool, config_parser: ConfigParser
member = str(member_idx)
task_group = f"{expid}_{start_date}_{member}"

if prune:
esm_analysis_prune(expid, member_idx)

with TaskGroup(task_group, implicit_barrier=False):
# Launch each SIM, create an implicit dependence by passing the result to the next task (checkpoint).
number_simulations = int(runtime_config_parser['common']['chunks'])
logger.info(f"Total of chunks configured: {number_simulations}")

if prune:
member_expid = f"{expid}_{member}"
logger.info(f"Starting the ESM analysis prune for {member_expid}...")
esm_analysis_prune(member_expid)

for sim in range(1, number_simulations + 1):
working_dir_exe = top_working_dir / start_date / member
log_file = str(working_dir_exe / f"fesom2_{expid}_{start_date}_{member}_{str(sim)}.out")
Expand Down
2 changes: 1 addition & 1 deletion Pillar_II/esm/src/fesom2/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ def esm_simulation(
StdIOStream: STDOUT
},
working_dir_exe={
Type: INOUT,
Type: IN,
Prefix: "#"
},
returns=int)
Expand Down

0 comments on commit 16b42db

Please sign in to comment.