Skip to content

Commit

Permalink
chore: Catch internal process logs
Browse files Browse the repository at this point in the history
  • Loading branch information
Ramimashkouk committed Apr 25, 2024
1 parent 9813a01 commit 565a10a
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 29 deletions.
8 changes: 4 additions & 4 deletions backend/df_designer/app/api/api_v1/endpoints/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ async def _stop_process(id_: int, process_manager: ProcessManager, process="run"
return {"status": "ok"}


def _check_process_status(id_: int, process_manager: ProcessManager) -> dict[str, str]:
async def _check_process_status(id_: int, process_manager: ProcessManager) -> dict[str, str]:
if id_ not in process_manager.processes:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="Process not found. It may have already exited.",
)
process_status = process_manager.get_status(id_)
process_status = await process_manager.get_status(id_)
return {"status": process_status}


Expand All @@ -57,7 +57,7 @@ async def stop_build(*, build_id: int, build_manager: BuildManager = Depends(dep

@router.get("/build/status/{build_id}", status_code=200)
async def check_build_status(*, build_id: int, build_manager: BuildManager = Depends(deps.get_build_manager)):
return _check_process_status(build_id, build_manager)
return await _check_process_status(build_id, build_manager)


@router.get("/builds", response_model=Optional[Union[list, dict]], status_code=200)
Expand Down Expand Up @@ -103,7 +103,7 @@ async def stop_run(*, run_id: int, run_manager: RunManager = Depends(deps.get_ru

@router.get("/run/status/{run_id}", status_code=200)
async def check_run_status(*, run_id: int, run_manager: RunManager = Depends(deps.get_run_manager)):
return _check_process_status(run_id, run_manager)
return await _check_process_status(run_id, run_manager)


@router.get("/runs", response_model=Optional[Union[list, dict]], status_code=200)
Expand Down
38 changes: 23 additions & 15 deletions backend/df_designer/app/services/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,15 @@ def __init__(self, id_: int, preset_end_status=""):
self.logger: logging.Logger

async def start(self, cmd_to_run):
async with aiofiles.open(self.log_path, "a", encoding="UTF-8"): # TODO: log to files
self.process = await asyncio.create_subprocess_exec(
*cmd_to_run.split(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)

def get_full_info(self) -> dict:
self.check_status()
self.process = await asyncio.create_subprocess_exec(
*cmd_to_run.split(),
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
stdin=asyncio.subprocess.PIPE,
)

async def get_full_info(self) -> dict:
await self.check_status()
return {key: getattr(self, key) for key in self.__dict__ if key not in ["process", "logger"]}

def set_full_info(self, params_dict):
Expand All @@ -57,7 +56,7 @@ async def periodically_check_status(self):
break
await asyncio.sleep(2) # TODO: ?sleep time shouldn't be constant

def check_status(self) -> str:
async def check_status(self) -> str:
"""Returns the process status [null, running, completed, failed, stopped].
- null: When a process is initiated but not started yet. This condition is unusual and typically indicates
incorrect usage or a process misuse in backend logic.
Expand All @@ -78,11 +77,19 @@ def check_status(self) -> str:
elif self.process.returncode == -15:
self.status = "stopped"
else:
self.logger.warning(
self.logger.error(
"Unexpected code was returned: '%s'. A non-zero return code indicates an error.",
self.process.returncode,
)
return str(self.process.returncode)
self.status = f"Exited with return code: {str(self.process.returncode)}"

if self.status not in ["null", "running"]:
stdout, stderr = await self.process.communicate()
if stdout:
self.logger.info(f"[stdout]\n{stdout.decode()}")
if stderr:
self.logger.error(f"[stderr]\n{stderr.decode()}")

return self.status

async def stop(self):
Expand Down Expand Up @@ -123,9 +130,10 @@ def __init__(self, id_: int, build_id: int = None, preset_end_status: str = ""):

async def update_db_info(self):
# save current run info into runs_path
self.logger.info("Updating db run info")
runs_conf = await read_conf(settings.runs_path)

run_params = self.get_full_info()
run_params = await self.get_full_info()
_map_to_str(run_params)

for run in runs_conf:
Expand Down Expand Up @@ -165,7 +173,7 @@ async def update_db_info(self):
# save current build info into builds_path
builds_conf = await read_conf(settings.builds_path)

build_params = self.get_full_info()
build_params = await self.get_full_info()
_map_to_str(build_params)

for build in builds_conf:
Expand Down
4 changes: 2 additions & 2 deletions backend/df_designer/app/services/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ async def stop(self, id_):
async def check_status(self, id_):
await self.processes[id_].periodically_check_status()

def get_status(self, id_):
return self.processes[id_].check_status()
async def get_status(self, id_):
return await self.processes[id_].check_status()

async def get_process_info(self, id_: int, path: Path):
db_conf = await read_conf(path)
Expand Down
11 changes: 7 additions & 4 deletions backend/df_designer/app/tests/api/test_bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,19 @@ async def test_stop_process_error(mocker, error_type):


# TODO: check the errors
def test__check_process_status(mocker):
@pytest.mark.asyncio
async def test_check_process_status(mocker):
process_id = 0
mocked_process_manager = mocker.MagicMock()
mocker.patch.object(mocked_process_manager, "processes", {process_id: mocker.MagicMock()})
mocker.patch.object(mocked_process_manager, "get_status", return_value="running")
mocker.patch.object(
mocked_process_manager, "get_status", mocker.AsyncMock(return_value="running")
)

response = _check_process_status(process_id, mocked_process_manager)
response = await _check_process_status(process_id, mocked_process_manager)

assert response == {"status": "running"}
assert mocked_process_manager.get_status.called_once_with(0)
mocked_process_manager.get_status.assert_awaited_once_with(0)


@pytest.mark.asyncio
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ async def _assert_process_status(response, process_manager, expected_end_status)

process_id = process_manager.last_id
logger.debug("Process id is %s", process_id)
current_status = process_manager.get_status(process_id)
current_status = await process_manager.get_status(process_id)
assert (
current_status == expected_end_status
), f"Current process status '{current_status}' did not match the expected '{expected_end_status}'"
Expand Down Expand Up @@ -104,7 +104,7 @@ async def _test_stop_process(mocker, get_manager_func, start_endpoint, stop_endp

last_id = manager.get_last_id()
logger.debug("Last id: %s, type: %s", last_id, type(last_id))
logger.debug("Process status %s", manager.get_status(last_id))
logger.debug("Process status %s", await manager.get_status(last_id))

stop_response = await async_client.get(f"{stop_endpoint}/{last_id}")
assert stop_response.status_code == 200
Expand Down Expand Up @@ -200,7 +200,7 @@ async def test_connect_to_ws(mocker, client): # noqa: F811
logger.debug("Last id: %s, type: %s", last_id, type(last_id))
logger.debug(
"Process status %s",
run_manager.get_status(last_id),
await run_manager.get_status(last_id),
)

# connect to websocket
Expand Down
2 changes: 1 addition & 1 deletion backend/df_designer/app/tests/services/test_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async def _run_process(cmd_to_run):
async def test_check_status(self, run_process, cmd_to_run, status):
process = await run_process(cmd_to_run)
await asyncio.sleep(2)
assert process.check_status() == status
assert await process.check_status() == status

# def test_periodically_check_status(self, run_process):
# process = await run_process("sleep 10000")
Expand Down

0 comments on commit 565a10a

Please sign in to comment.