diff --git a/Pillar_II/esm/src/dummy_prune_analysis.py b/Pillar_II/esm/src/dummy_prune_analysis.py index 0f1b69b..98e151f 100644 --- a/Pillar_II/esm/src/dummy_prune_analysis.py +++ b/Pillar_II/esm/src/dummy_prune_analysis.py @@ -17,26 +17,32 @@ CHECK_FOR_PRUNING_SLEEP_TIME_SECS = 5 """How long should we sleep for""" +# TODO: remove me later +MEMBERS_PRUNED = [2] + logger = logging.getLogger(__name__) -@task(expid=IN) -def esm_analysis_prune(expid: str): +@on_failure(management='IGNORE', returns=0) +@task(expid=IN, member=IN, returns=bool, time_out=480) +def esm_analysis_prune(expid: str, member: int) -> bool: """This has to be called by PYCOMPSs as analysis. Args: expid: The experiment ID. + member: The ensemble member. + Returns: + bool: True if the ensemble member was pruned. """ # N.B.: importing this results in a network query to Hecuba servers, # which fails if the servers are not available. # TODO: Suvi: call the AI code here - member = int(expid.split('_')[1]) - if member != 2: - logging.info("We only prune the ensemble member #2!") - return + if member not in MEMBERS_PRUNED: + logging.info(f"We only prune the ensemble members #{str(MEMBERS_PRUNED)}!") + return False - logging.info("Pruning ensemble member #2!") + logging.info(f"Pruning ensemble member #{str(member)}!") logging.info("Importing Hecuba") from hecuba import StorageDict # type: ignore @@ -58,25 +64,28 @@ class MetaDictClass(StorageDict): # RuntimeError: StorageDict: missed specification. # Type of Primary Key or Column undefined - # This is an infinite-loop, with a sleep time. The execution # must be wrapped in an existing COMPSs or Slurm job, with a # walltime or some limit to control the maximum execution # time, and kill this task. + expid_member = f'{expid}_{str(member)}' while True: - logging.info(f"MetaDictClass.get_by_alias('{expid}')") + logging.info(f"MetaDictClass.get_by_alias('{expid_member}')") try: - mdc = MetaDictClass.get_by_alias(expid) + mdc = MetaDictClass.get_by_alias(expid_member) break except RuntimeError: - logging.info(f"Simulation {expid} not found sleeping +{CHECK_FOR_PRUNING_SLEEP_TIME_SECS} seconds...") + logging.info( + f"Simulation member {expid_member} not found sleeping +{CHECK_FOR_PRUNING_SLEEP_TIME_SECS} seconds...") sleep(CHECK_FOR_PRUNING_SLEEP_TIME_SECS) prune_sec = randint(10, 20) sleep(prune_sec) logging.info("Setting prune to TRUE!") mdc['prune'] = "true" - logging.info(f"Pruned {expid} after {prune_sec} seconds") + logging.info(f"Pruned member [{str(member)}] from experiment [{expid}] after {prune_sec} seconds") + + return True __all__ = ['esm_analysis_prune'] diff --git a/Pillar_II/esm/src/esm_simulation.py b/Pillar_II/esm/src/esm_simulation.py index 0554b24..ae2cb6f 100644 --- a/Pillar_II/esm/src/esm_simulation.py +++ b/Pillar_II/esm/src/esm_simulation.py @@ -193,6 +193,15 @@ def _run_esm(*, expid: str, model: str, prune: bool, config_parser: ConfigParser processes = runtime_config_parser['pycompss']['processes'] processes_per_node = runtime_config_parser['pycompss']['processes_per_node'] + # N.B.: Apparently giving all nodes to mpirun/FESOM2 results in resource + # starvation as COMPSs will be trying to run a task to prune as well. + # Thus, we always allocate one node less to leave it for COMPSs and + # Hecuba. + cores_in_one_node = int(processes_per_node) + cores_requested = int(processes) + if cores_requested > cores_in_one_node: + processes = str(cores_requested - cores_in_one_node) + # This is where we delegate the ESM execution to a model's module code # (e.g. FESOM2, AWICM3, etc.). ensemble_start_dates = runtime_config_parser['common']['ensemble_start_dates'].split(",") @@ -206,16 +215,14 @@ def _run_esm(*, expid: str, model: str, prune: bool, config_parser: ConfigParser member = str(member_idx) task_group = f"{expid}_{start_date}_{member}" + if prune: + esm_analysis_prune(expid, member_idx) + with TaskGroup(task_group, implicit_barrier=False): # Launch each SIM, create an implicit dependence by passing the result to the next task (checkpoint). number_simulations = int(runtime_config_parser['common']['chunks']) logger.info(f"Total of chunks configured: {number_simulations}") - if prune: - member_expid = f"{expid}_{member}" - logger.info(f"Starting the ESM analysis prune for {member_expid}...") - esm_analysis_prune(member_expid) - for sim in range(1, number_simulations + 1): working_dir_exe = top_working_dir / start_date / member log_file = str(working_dir_exe / f"fesom2_{expid}_{start_date}_{member}_{str(sim)}.out") diff --git a/Pillar_II/esm/src/fesom2/__init__.py b/Pillar_II/esm/src/fesom2/__init__.py index 3d8c5ab..f9d1308 100644 --- a/Pillar_II/esm/src/fesom2/__init__.py +++ b/Pillar_II/esm/src/fesom2/__init__.py @@ -324,7 +324,7 @@ def esm_simulation( StdIOStream: STDOUT }, working_dir_exe={ - Type: INOUT, + Type: IN, Prefix: "#" }, returns=int)