Skip to content

Commit

Permalink
remove process.wait and determine quantity after spawning all runners
Browse files Browse the repository at this point in the history
  • Loading branch information
cbartz committed Jul 12, 2024
1 parent f8d8fb3 commit 597c1ba
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 29 deletions.
29 changes: 14 additions & 15 deletions src/reactive/runner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,17 +73,17 @@ def reconcile(quantity: int, config: ReactiveRunnerConfig) -> int:
"""
actual_quantity = _determine_current_quantity()
logger.info("Actual quantity of reactive runner processes: %s", actual_quantity)
delta = quantity - actual_quantity
actual_delta = delta
actual_delta = delta = quantity - actual_quantity
if delta > 0:
logger.info("Will spawn %d new reactive runner processes", delta)
_setup_logging()
for _ in range(delta):
try:
_spawn_runner(config)
except ReactiveRunnerError:
except _SpawnError:
logger.exception("Failed to spawn a new reactive runner process")
actual_delta -= 1
actual_quantity_after_spawning = _determine_current_quantity()
actual_delta = actual_quantity_after_spawning - actual_quantity
elif delta < 0:
logger.info(
"%d reactive runner processes are running. "
Expand Down Expand Up @@ -125,14 +125,18 @@ def _setup_logging() -> None:
shutil.chown(REACTIVE_RUNNER_LOG_DIR, user=UBUNTU_USER, group=UBUNTU_USER)


class _SpawnError(Exception):
"""Raised when spawning a runner fails."""


def _spawn_runner(reactive_runner_config: ReactiveRunnerConfig) -> None:
"""Spawn a runner.
Args:
reactive_runner_config: The configuration for the reactive runner.
Raises:
ReactiveRunnerError: If the runner fails to spawn.
_SpawnError: If the runner fails to spawn.
"""
env = {
"PYTHONPATH": "src:lib:venv",
Expand Down Expand Up @@ -164,14 +168,9 @@ def _spawn_runner(reactive_runner_config: ReactiveRunnerConfig) -> None:
user=UBUNTU_USER,
)

try:
process.wait(0.001)
except subprocess.TimeoutExpired:
pass
else:
if process.returncode not in (0, None):
raise ReactiveRunnerError(
f"Failed to spawn a new reactive runner process with pid {process.pid}."
f" Return code: {process.returncode}"
)
if process.returncode not in (0, None):
raise _SpawnError(
f"Failed to spawn a new reactive runner process with pid {process.pid}."
f" Return code: {process.returncode}"
)
logger.debug("Spawned a new reactive runner process with pid %s", process.pid)
51 changes: 37 additions & 14 deletions tests/unit/reactive/test_runner_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,11 @@ def secure_run_subprocess_mock_fixture(monkeypatch: pytest.MonkeyPatch) -> Magic
@pytest.fixture(name="subprocess_popen_mock")
def subprocess_popen_mock_fixture(monkeypatch: pytest.MonkeyPatch) -> MagicMock:
"""Mock the subprocess.Popen function."""
popen_result = MagicMock(spec=subprocess.Popen, pid=1234)
popen_result = MagicMock(spec=subprocess.Popen, pid=1234, returncode=0)
subprocess_popen_mock = MagicMock(
spec=subprocess.Popen,
return_value=popen_result,
)
popen_result.wait.side_effect = subprocess.TimeoutExpired("cmd", 1)
monkeypatch.setattr("subprocess.Popen", subprocess_popen_mock)
return subprocess_popen_mock

Expand All @@ -69,7 +68,9 @@ def test_reconcile_spawns_runners(
"""
queue_name = secrets.token_hex(16)
reactive_config = ReactiveRunnerConfig(mq_uri=EXAMPLE_MQ_URI, queue_name=queue_name)
_arrange_reactive_processes(secure_run_subprocess_mock, count=2)
_arrange_reactive_processes(
secure_run_subprocess_mock, count_before_spawn=2, count_after_spawn=5
)

delta = reconcile(5, reactive_config)

Expand All @@ -88,7 +89,9 @@ def test_reconcile_does_not_spawn_runners(
"""
queue_name = secrets.token_hex(16)
reactive_config = ReactiveRunnerConfig(mq_uri=EXAMPLE_MQ_URI, queue_name=queue_name)
_arrange_reactive_processes(secure_run_subprocess_mock, count=2)
_arrange_reactive_processes(
secure_run_subprocess_mock, count_before_spawn=2, count_after_spawn=2
)

delta = reconcile(2, reactive_config)

Expand All @@ -106,7 +109,9 @@ def test_reconcile_does_not_spawn_runners_for_too_many_processes(
"""
queue_name = secrets.token_hex(16)
reactive_config = ReactiveRunnerConfig(mq_uri=EXAMPLE_MQ_URI, queue_name=queue_name)
_arrange_reactive_processes(secure_run_subprocess_mock, count=2)
_arrange_reactive_processes(
secure_run_subprocess_mock, count_before_spawn=2, count_after_spawn=2
)
delta = reconcile(1, reactive_config)

assert delta == 0
Expand Down Expand Up @@ -151,24 +156,42 @@ def test_reconcile_spawn_runner_failed(
MagicMock(returncode=1),
MagicMock(returncode=0),
]
_arrange_reactive_processes(secure_run_subprocess_mock, count=0)
_arrange_reactive_processes(
secure_run_subprocess_mock, count_before_spawn=0, count_after_spawn=2
)

delta = reconcile(3, reactive_config)

assert delta == 2


def _arrange_reactive_processes(secure_run_subprocess_mock: MagicMock, count: int):
def _arrange_reactive_processes(
secure_run_subprocess_mock: MagicMock, count_before_spawn: int, count_after_spawn: int
):
"""Mock reactive runner processes are active.
Args:
secure_run_subprocess_mock: The mock to use for the ps command.
count: The number of processes
count_before_spawn: The number of processes before spawning new ones.
count_after_spawn: The number of processes after spawning new ones.
"""
process_cmds = "\n".join([f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}" for _ in range(count)])
secure_run_subprocess_mock.return_value = CompletedProcess(
args=PS_COMMAND_LINE_LIST,
returncode=0,
stdout=f"CMD\n{process_cmds}".encode("utf-8"),
stderr=b"",
process_cmds_before = "\n".join(
[f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}" for _ in range(count_before_spawn)]
)
process_cmds_after = "\n".join(
[f"{PYTHON_BIN} {REACTIVE_RUNNER_SCRIPT_FILE}" for _ in range(count_after_spawn)]
)
secure_run_subprocess_mock.side_effect = [
CompletedProcess(
args=PS_COMMAND_LINE_LIST,
returncode=0,
stdout=f"CMD\n{process_cmds_before}".encode("utf-8"),
stderr=b"",
),
CompletedProcess(
args=PS_COMMAND_LINE_LIST,
returncode=0,
stdout=f"CMD\n{process_cmds_after}".encode("utf-8"),
stderr=b"",
),
]

0 comments on commit 597c1ba

Please sign in to comment.