Skip to content

Commit

Permalink
feat: Add alive status to run processes
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramimashkouk committed Apr 26, 2024
1 parent b202367 commit d988f1b
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 54 deletions.
32 changes: 28 additions & 4 deletions backend/df_designer/app/services/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,19 +55,28 @@ async def periodically_check_status(self):
await asyncio.sleep(2) # TODO: ?sleep time shouldn't be constant

async def check_status(self) -> str:
"""Returns the process status [null, running, completed, failed, stopped].
"""Returns the process status [null, alive, running, completed, failed, stopped].
- null: When a process is initiated but not started yet. This condition is unusual and typically indicates
incorrect usage or a process misuse in backend logic.
- running: returncode is None
- alive: process is alive and ready to communicate
- running: process is still trying to get alive. no communication
- completed: returncode is 0
- failed: returncode is 1
- stopped: returncode is -15
- "Exited with return code: {self.process.returncode}. A non-zero return code indicates an error": Otherwise
"""
if self.process is None:
self.status = "null"
# if process is already alive, don't interrupt potential open channels by checking status periodically.
elif self.process.returncode is None:
self.status = "running"
if self.status == "alive":
self.status = "alive"
else:
if await self.is_alive():
self.status = "alive"
else:
self.status = "running"

elif self.process.returncode == 0:
self.status = "completed"
elif self.process.returncode == 1:
Expand All @@ -81,7 +90,7 @@ async def check_status(self) -> str:
)
self.status = f"Exited with return code: {str(self.process.returncode)}"

if self.status not in ["null", "running"]:
if self.status not in ["null", "running", "alive", "stopped"]:
stdout, stderr = await self.process.communicate()
if stdout:
self.logger.info(f"[stdout]\n{stdout.decode()}")
Expand All @@ -98,6 +107,8 @@ async def stop(self):
self.logger.debug("Terminating process '%s'", self.id)
self.process.terminate()
await self.process.wait()
self.logger.debug("Process returencode '%s' ", self.process.returncode)

except ProcessLookupError as exc:
self.logger.error("Process '%s' not found. It may have already exited.", self.id)
raise ProcessLookupError from exc
Expand All @@ -116,6 +127,19 @@ async def write_stdin(self, message):
self.process.stdin.write(message)
await self.process.stdin.drain()

async def is_alive(self) -> bool:
timeout = 3
message = b"Hi\n"
try:
# Attempt to write and read from the process with a timeout.
await self.write_stdin(message)
output = await asyncio.wait_for(self.read_stdout(), timeout=timeout)
self.logger.debug("Process output afer communication: %s", output.decode())
return True
except asyncio.exceptions.TimeoutError:
self.logger.debug("Process did not accept input within the timeout period.")
return False


class RunProcess(Process):
def __init__(self, id_: int, build_id: int = None, preset_end_status: str = ""):
Expand Down
4 changes: 2 additions & 2 deletions backend/df_designer/app/tests/api/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,11 +57,11 @@ async def test_check_process_status(mocker):
process_id = 0
mocked_process_manager = mocker.MagicMock()
mocker.patch.object(mocked_process_manager, "processes", {process_id: mocker.MagicMock()})
mocker.patch.object(mocked_process_manager, "get_status", mocker.AsyncMock(return_value="running"))
mocker.patch.object(mocked_process_manager, "get_status", mocker.AsyncMock(return_value="alive"))

response = await _check_process_status(process_id, mocked_process_manager)

assert response == {"status": "running"}
assert response == {"status": "alive"}
mocked_process_manager.get_status.assert_awaited_once_with(0)


Expand Down
44 changes: 6 additions & 38 deletions backend/df_designer/app/tests/integration/test_api_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,6 @@ async def _start_process(async_client: AsyncClient, endpoint, preset_end_status)
)


async def _try_communicate(process, message):
timeout = 5

try:
# Attempt to write and read from the process with a timeout.
await process.write_stdin(message)
output = await asyncio.wait_for(process.read_stdout(), timeout=timeout)
logger.debug("Process output afer communication: %s", output)
except asyncio.exceptions.TimeoutError:
logger.debug("Process did not accept input within the timeout period.")
output = None
return output


@asynccontextmanager
async def _override_dependency(mocker_obj, get_manager_func):
process_manager = get_manager_func()
Expand All @@ -54,7 +40,7 @@ async def _assert_process_status(response, process_manager, expected_end_status)
process_manager.processes[process_manager.last_id].process.wait(), timeout=4
) # TODO: Consider making this timeout configurable
except asyncio.exceptions.TimeoutError as exc:
if expected_end_status == "running":
if expected_end_status in ["alive", "running"]:
logger.debug("Loop process timed out. Expected behavior.")
else:
logger.debug("Process with expected end status '%s' timed out with status 'running'.", expected_end_status)
Expand All @@ -67,32 +53,18 @@ async def _assert_process_status(response, process_manager, expected_end_status)
current_status == expected_end_status
), f"Current process status '{current_status}' did not match the expected '{expected_end_status}'"

return process_id, current_status


async def _assert_interaction_with_running_process(process_manager, process_id, end_status):
process = process_manager.processes[process_id]
message = b"Hi\n"

output = await _try_communicate(process, message)

if end_status == "success":
assert output, "No output received from the process"
elif end_status == "loop":
assert output is None, "Process replied to an input when it was expected not to."

process.process.terminate()
await process.process.wait()
return current_status


async def _test_start_process(mocker_obj, get_manager_func, endpoint, preset_end_status, expected_end_status):
async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as async_client:
async with _override_dependency(mocker_obj, get_manager_func) as process_manager:
response = await _start_process(async_client, endpoint, preset_end_status)
process_id, current_status = await _assert_process_status(response, process_manager, expected_end_status)
current_status = await _assert_process_status(response, process_manager, expected_end_status)

if current_status == "running":
await _assert_interaction_with_running_process(process_manager, process_id, preset_end_status)
process_manager.processes[process_manager.last_id].process.terminate()
await process_manager.processes[process_manager.last_id].process.wait()


async def _test_stop_process(mocker, get_manager_func, start_endpoint, stop_endpoint):
Expand Down Expand Up @@ -154,7 +126,7 @@ async def test_stop_build(mocker):
# Test processes of various end_status + Test integration with get_status. No db interaction (mocked processes)
@pytest.mark.asyncio
@pytest.mark.parametrize(
"end_status, process_status", [("failure", "failed"), ("loop", "running"), ("success", "running")]
"end_status, process_status", [("failure", "failed"), ("loop", "running"), ("success", "alive")]
)
async def test_start_run(mocker, end_status, process_status):
build_id = 43
Expand Down Expand Up @@ -198,10 +170,6 @@ async def test_connect_to_ws(mocker, client): # noqa: F811
# Process status
last_id = run_manager.get_last_id()
logger.debug("Last id: %s, type: %s", last_id, type(last_id))
logger.debug(
"Process status %s",
await run_manager.get_status(last_id),
)

# connect to websocket
with client.websocket_connect(f"/api/v1/bot/run/connect?run_id={last_id}") as websocket:
Expand Down
2 changes: 1 addition & 1 deletion backend/df_designer/app/tests/services/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ async def _run_process(cmd_to_run):
"cmd_to_run, status",
[
("sleep 10000", "running"),
("bash -c exit 1", "failed"),
("cat /non_existing_file", "failed"),
("echo Hello df_designer", "completed"),
],
)
Expand Down
19 changes: 10 additions & 9 deletions backend/df_designer/app/tests/services/test_process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,17 +42,18 @@ async def _run_manager():
return _run_manager

@pytest.mark.asyncio
@pytest.mark.parametrize("status", ["success", "with_error"])
async def test_stop(self, run_manager, status):
async def test_stop_success(self, run_manager):
manager = await run_manager()
if status == "success":
process_id = manager.get_last_id()
process_id = manager.get_last_id()
await manager.stop(process_id)
assert manager.processes[process_id].process.returncode == -15

@pytest.mark.asyncio
async def test_stop_with_error(self, run_manager):
manager = await run_manager()
process_id = randint(1000, 2000)
with pytest.raises((RuntimeError, ProcessLookupError)):
await manager.stop(process_id)
assert manager.processes[process_id].process.returncode == -15
else:
process_id = randint(1000, 2000)
with pytest.raises((RuntimeError, ProcessLookupError)):
await manager.stop(process_id)

# def test_check_status(self, run_manager, preset):
# pass
Expand Down

0 comments on commit d988f1b

Please sign in to comment.