From 89a9f0bad82b3920bd5a6d28588758cfcf66ee07 Mon Sep 17 00:00:00 2001 From: Timothy Willard <9395586+TimothyWillard@users.noreply.github.com> Date: Fri, 3 Jan 2025 11:34:17 -0500 Subject: [PATCH] Change `max_workers` to 1 Attempt to diagnose CI failures by forcing `max_workers=1`. --- flepimop/gempyor_pkg/src/gempyor/outcomes.py | 2 + flepimop/gempyor_pkg/src/gempyor/seir.py | 3 +- .../outcomes/test_run_parallel_outcomes.py | 168 ++++++++++++++++++ .../tests/parameters/test_parameters_class.py | 33 ++-- .../tests/seir/test_run_parallel_SEIR.py | 143 +++++++++++++++ 5 files changed, 334 insertions(+), 15 deletions(-) create mode 100644 flepimop/gempyor_pkg/tests/outcomes/test_run_parallel_outcomes.py create mode 100644 flepimop/gempyor_pkg/tests/seir/test_run_parallel_SEIR.py diff --git a/flepimop/gempyor_pkg/src/gempyor/outcomes.py b/flepimop/gempyor_pkg/src/gempyor/outcomes.py index f0c60ca1c..d57765960 100644 --- a/flepimop/gempyor_pkg/src/gempyor/outcomes.py +++ b/flepimop/gempyor_pkg/src/gempyor/outcomes.py @@ -91,6 +91,8 @@ def onerun_delayframe_outcomes( load_ID: bool = False, sim_id2load: int = None, ): + np.random.seed(seed=sim_id2write) + with Timer("buildOutcome.structure"): parameters = read_parameters_from_config(modinf) diff --git a/flepimop/gempyor_pkg/src/gempyor/seir.py b/flepimop/gempyor_pkg/src/gempyor/seir.py index 77b303f2b..d4e8cb929 100644 --- a/flepimop/gempyor_pkg/src/gempyor/seir.py +++ b/flepimop/gempyor_pkg/src/gempyor/seir.py @@ -1,5 +1,6 @@ import itertools import logging +import random import time import numpy as np @@ -257,7 +258,7 @@ def onerun_SEIR( sim_id2load: int = None, config=None, ): - np.random.seed() + np.random.seed(seed=sim_id2write) modinf.parameters.reinitialize_distributions() npi = None diff --git a/flepimop/gempyor_pkg/tests/outcomes/test_run_parallel_outcomes.py b/flepimop/gempyor_pkg/tests/outcomes/test_run_parallel_outcomes.py new file mode 100644 index 000000000..4c7d0b169 --- /dev/null +++ b/flepimop/gempyor_pkg/tests/outcomes/test_run_parallel_outcomes.py @@ -0,0 +1,168 @@ +import multiprocessing as mp +import os +from pathlib import Path +import shutil +import subprocess + +import pandas as pd +import pytest + + +@pytest.fixture +def setup_sample_2pop_vaccine_scenarios(tmp_path: Path) -> Path: + tutorials_path = Path(os.path.dirname(__file__) + "/../../../../examples/tutorials") + for file in ( + "config_sample_2pop_vaccine_scenarios.yml", + "model_input/geodata_sample_2pop.csv", + "model_input/mobility_sample_2pop.csv", + "model_input/ic_2pop.csv", + ): + source = tutorials_path / file + destination = tmp_path / file + destination.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(source, destination) + return tmp_path + + +@pytest.mark.parametrize("n_jobs", (1, 2)) +@pytest.mark.parametrize("start_method", mp.get_all_start_methods()) +def test_run_parallel_outcomes_by_multiprocessing_start_method( + monkeypatch: pytest.MonkeyPatch, + setup_sample_2pop_vaccine_scenarios: Path, + n_jobs: int, + start_method: str, +) -> None: + """ + Test the parallelization of `run_parallel_outcomes` by multiprocessing start method. + + This test: + + 1. Sets up the test environment by copying the necessary files to a temporary directory. + 2. Runs a pared down version of `gempyor.simulate.simulate` in a new process. + 3. Reads the contents of the 'hpar' directory as a DataFrame. + 4. Tests the contents of the 'hpar' DataFrame. + + The reason for the new process is to control the start method used by multiprocessing. + The `run_parallel_outcomes` function behaves differently depending on the start method + used. Under the hood `tqdm.contrib.concurrent.process_map` creates a + `concurrent.futures.ProcessPoolExecutor` with the default start method (see + [tqdm/tqdm#1265](https://github.com/tqdm/tqdm/pull/1265)), which is 'spawn' on + MacOS/Windows and 'fork' on Linux. The work around to this is to force multiprocessing + to use the desired start method by setting it in the '__main__' module with + [`multiprocessing.set_start_method`](https://docs.python.org/3.11/library/multiprocessing.html#multiprocessing.set_start_method). + """ + # Test setup + monkeypatch.chdir(setup_sample_2pop_vaccine_scenarios) + + # Run a pared down version of `gempyor.simulate.simulate` in a new process + test_python_script = setup_sample_2pop_vaccine_scenarios / "test.py" + with open(test_python_script, "w") as f: + f.write( + f""" +import multiprocessing as mp +import os +from pathlib import Path + + +from gempyor.model_info import ModelInfo +from gempyor.outcomes import run_parallel_outcomes +from gempyor.seir import run_parallel_SEIR +from gempyor.shared_cli import parse_config_files + +def main(): + setup_sample_2pop_vaccine_scenarios = Path("{setup_sample_2pop_vaccine_scenarios}") + + cfg = parse_config_files( + config_filepath=setup_sample_2pop_vaccine_scenarios + / "config_sample_2pop_vaccine_scenarios.yml", + id_run_id=None, + out_run_id=None, + seir_modifiers_scenarios=[], + outcome_modifiers_scenarios=[], + in_prefix=None, + nslots=None, + jobs={n_jobs}, + write_csv=False, + write_parquet=True, + first_sim_index=1, + stoch_traj_flag=False, + verbose=True, + ) + + seir_modifiers_scenario, outcome_modifiers_scenario = "no_vax", None + nchains = cfg["nslots"].as_number() + assert nchains == 10 + + modinf = ModelInfo( + config=cfg, + nslots=nchains, + seir_modifiers_scenario=seir_modifiers_scenario, + outcome_modifiers_scenario=outcome_modifiers_scenario, + write_csv=cfg["write_csv"].get(bool), + write_parquet=cfg["write_parquet"].get(bool), + first_sim_index=cfg["first_sim_index"].get(int), + in_run_id=cfg["in_run_id"].get(str) if cfg["in_run_id"].exists() else None, + out_run_id=cfg["out_run_id"].get(str) if cfg["out_run_id"].exists() else None, + stoch_traj_flag=cfg["stoch_traj_flag"].get(bool), + config_filepath=cfg["config_src"].as_str_seq(), + ) + + # `gempyor.outcomes.run_parallel_outcomes` expects SEIR files + assert run_parallel_SEIR(modinf, config=cfg, n_jobs=cfg["jobs"].get(int)) is None + + assert run_parallel_outcomes( + sim_id2write=cfg["first_sim_index"].get(int), + modinf=modinf, + nslots=nchains, + n_jobs=cfg["jobs"].get(int), + ) == 1 + +if __name__ == "__main__": + os.chdir("{setup_sample_2pop_vaccine_scenarios}") + mp.set_start_method("{start_method}", force=True) + main() +""" + ) + + python = shutil.which("python") + assert python is not None + proc = subprocess.run([python, test_python_script], capture_output=True, check=True) + assert ( + proc.returncode == 0 + ), f"Issue running test script returned {proc.returncode}: {proc.stderr.decode()}." + + # Get the contents of 'hpar' directories as DataFrames + hpar_directory: Path | None = None + for p in setup_sample_2pop_vaccine_scenarios.rglob("*"): + if p.is_dir() and p.name == "hpar": + hpar_directory = p + if hpar_directory is not None: + break + + def read_directory(directory: Path) -> list[pd.DataFrame]: + dfs: list[pd.DataFrame] | pd.DataFrame = [] + for i, f in enumerate(sorted(list(directory.glob("*.parquet")))): + dfs.append(pd.read_parquet(f)) + dfs[-1]["slot"] = i + dfs = pd.concat(dfs) + return dfs + + hpar = read_directory(hpar_directory) + + # Test contents of 'hpar' DataFrames + assert ( + hpar[ + (hpar["subpop"] == "large_province") + & (hpar["quantity"] == "probability") + & (hpar["outcome"] == "incidCase") + ]["value"].nunique() + == 10 + ) + assert ( + hpar[ + (hpar["subpop"] == "small_province") + & (hpar["quantity"] == "probability") + & (hpar["outcome"] == "incidCase") + ]["value"].nunique() + == 10 + ) diff --git a/flepimop/gempyor_pkg/tests/parameters/test_parameters_class.py b/flepimop/gempyor_pkg/tests/parameters/test_parameters_class.py index 663b56e6a..c281dfbb2 100644 --- a/flepimop/gempyor_pkg/tests/parameters/test_parameters_class.py +++ b/flepimop/gempyor_pkg/tests/parameters/test_parameters_class.py @@ -1,6 +1,7 @@ from datetime import date from functools import partial from itertools import repeat +import multiprocessing as mp import pathlib from tempfile import NamedTemporaryFile from typing import Any, Callable @@ -717,32 +718,36 @@ def test_parameters_reduce(self) -> None: pass def test_reinitialize_parameters(self, tmp_path: pathlib.Path) -> None: + from concurrent.futures import ProcessPoolExecutor + mock_inputs = distribution_three_valid_parameter_factory(tmp_path) np.random.seed(123) params = mock_inputs.create_parameters_instance() - results = tqdm.contrib.concurrent.process_map( - sample_params, - repeat(params, times=6), - repeat(False, times=6), - max_workers=2, - disable=True, - ) + with ProcessPoolExecutor(max_workers=2, mp_context=mp.get_context("spawn")) as ex: + results = list( + ex.map( + sample_params, + repeat(params, times=6), + repeat(False, times=6), + ) + ) for i in range(1, len(results)): assert np.allclose(results[i - 1], results[i]) np.random.seed(123) - results_with_reinit = tqdm.contrib.concurrent.process_map( - sample_params, - repeat(params, times=6), - repeat(True, times=6), - max_workers=2, - disable=True, - ) + with ProcessPoolExecutor(max_workers=2, mp_context=mp.get_context("spawn")) as ex: + results_with_reinit = list( + ex.map( + sample_params, + repeat(params, times=6), + repeat(True, times=6), + ) + ) for i in range(1, len(results_with_reinit)): assert not np.allclose(results_with_reinit[i - 1], results_with_reinit[i]) diff --git a/flepimop/gempyor_pkg/tests/seir/test_run_parallel_SEIR.py b/flepimop/gempyor_pkg/tests/seir/test_run_parallel_SEIR.py new file mode 100644 index 000000000..2b548325f --- /dev/null +++ b/flepimop/gempyor_pkg/tests/seir/test_run_parallel_SEIR.py @@ -0,0 +1,143 @@ +import multiprocessing as mp +import os +from pathlib import Path +import shutil +import subprocess + +import pandas as pd +import pytest + + +@pytest.fixture +def setup_sample_2pop_vaccine_scenarios(tmp_path: Path) -> Path: + tutorials_path = Path(os.path.dirname(__file__) + "/../../../../examples/tutorials") + for file in ( + "config_sample_2pop_vaccine_scenarios.yml", + "model_input/geodata_sample_2pop.csv", + "model_input/mobility_sample_2pop.csv", + "model_input/ic_2pop.csv", + ): + source = tutorials_path / file + destination = tmp_path / file + destination.parent.mkdir(parents=True, exist_ok=True) + shutil.copy(source, destination) + return tmp_path + + +@pytest.mark.parametrize("n_jobs", (1, 2)) +@pytest.mark.parametrize("start_method", mp.get_all_start_methods()) +def test_run_parallel_SEIR_by_multiprocessing_start_method( + monkeypatch: pytest.MonkeyPatch, + setup_sample_2pop_vaccine_scenarios: Path, + n_jobs: int, + start_method: str, +) -> None: + """ + Test the parallelization of `run_parallel_SEIR` by multiprocessing start method. + + This test: + + 1. Sets up the test environment by copying the necessary files to a temporary directory. + 2. Runs a pared down version of `gempyor.simulate.simulate` in a new process. + 3. Reads the contents of the 'spar' directory as a DataFrame. + 4. Tests the contents of the 'spar' DataFrame. + + The reason for the new process is to control the start method used by multiprocessing. + The `run_parallel_SEIR` function behaves differently depending on the start method used. + Under the hood `tqdm.contrib.concurrent.process_map` creates a + `concurrent.futures.ProcessPoolExecutor` with the default start method (see + [tqdm/tqdm#1265](https://github.com/tqdm/tqdm/pull/1265)), which is 'spawn' on + MacOS/Windows and 'fork' on Linux. The work around to this is to force multiprocessing + to use the desired start method by setting it in the '__main__' module with + [`multiprocessing.set_start_method`](https://docs.python.org/3.11/library/multiprocessing.html#multiprocessing.set_start_method). + """ + # Test setup + monkeypatch.chdir(setup_sample_2pop_vaccine_scenarios) + + # Run a pared down version of `gempyor.simulate.simulate` in a new process + test_python_script = setup_sample_2pop_vaccine_scenarios / "test.py" + with open(test_python_script, "w") as f: + f.write( + f""" +import multiprocessing as mp +import os +from pathlib import Path + +from gempyor.model_info import ModelInfo +from gempyor.seir import run_parallel_SEIR +from gempyor.shared_cli import parse_config_files + +def main(): + setup_sample_2pop_vaccine_scenarios = Path("{setup_sample_2pop_vaccine_scenarios}") + + cfg = parse_config_files( + config_filepath=setup_sample_2pop_vaccine_scenarios + / "config_sample_2pop_vaccine_scenarios.yml", + id_run_id=None, + out_run_id=None, + seir_modifiers_scenarios=[], + outcome_modifiers_scenarios=[], + in_prefix=None, + nslots=None, + jobs={n_jobs}, + write_csv=False, + write_parquet=True, + first_sim_index=1, + stoch_traj_flag=False, + verbose=True, + ) + + seir_modifiers_scenario, outcome_modifiers_scenario = "no_vax", None + nchains = cfg["nslots"].as_number() + assert nchains == 10 + + modinf = ModelInfo( + config=cfg, + nslots=nchains, + seir_modifiers_scenario=seir_modifiers_scenario, + outcome_modifiers_scenario=outcome_modifiers_scenario, + write_csv=cfg["write_csv"].get(bool), + write_parquet=cfg["write_parquet"].get(bool), + first_sim_index=cfg["first_sim_index"].get(int), + in_run_id=cfg["in_run_id"].get(str) if cfg["in_run_id"].exists() else None, + out_run_id=cfg["out_run_id"].get(str) if cfg["out_run_id"].exists() else None, + stoch_traj_flag=cfg["stoch_traj_flag"].get(bool), + config_filepath=cfg["config_src"].as_str_seq(), + ) + + assert run_parallel_SEIR(modinf, config=cfg, n_jobs=cfg["jobs"].get(int)) is None + +if __name__ == "__main__": + os.chdir("{setup_sample_2pop_vaccine_scenarios}") + mp.set_start_method("{start_method}", force=True) + main() +""" + ) + + python = shutil.which("python") + assert python is not None + proc = subprocess.run([python, test_python_script], capture_output=True, check=True) + assert ( + proc.returncode == 0 + ), f"Issue running test script returned {proc.returncode}: {proc.stderr.decode()}." + + # Get the contents of 'spar' directories as DataFrames + spar_directory: Path | None = None + for p in setup_sample_2pop_vaccine_scenarios.rglob("*"): + if p.is_dir() and p.name == "spar": + spar_directory = p + if spar_directory is not None: + break + + def read_directory(directory: Path) -> list[pd.DataFrame]: + dfs: list[pd.DataFrame] | pd.DataFrame = [] + for i, f in enumerate(sorted(list(directory.glob("*.parquet")))): + dfs.append(pd.read_parquet(f)) + dfs[-1]["slot"] = i + dfs = pd.concat(dfs) + return dfs + + spar = read_directory(spar_directory) + + # Test contents of 'spar' DataFrames + assert spar[spar["parameter"] == "Ro"]["value"].nunique() == 10