Skip to content

Commit

Permalink
refactor: refactor ProcessManager
Browse files Browse the repository at this point in the history
* Create BuildManager and RunManager
  • Loading branch information
Ramimashkouk committed Mar 23, 2024
1 parent fe77ba4 commit 9f6936d
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 46 deletions.
28 changes: 14 additions & 14 deletions backend/df_designer/app/api/api_v1/endpoints/bot.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from app.schemas.preset import Preset
from app.core.logger_config import get_logger
from app.clients.process_manager import ProcessManager
from app.clients.process_manager import ProcessManager, BuildManager, RunManager
from app.api import deps
from app.clients.websocket_manager import WebSocketManager

Expand Down Expand Up @@ -31,68 +31,68 @@ def _check_process_status(pid: int, process_manager: ProcessManager):


@router.post("/build/start", status_code=201)
async def start_build(preset: Preset, build_manager: ProcessManager = Depends(deps.get_build_manager)):
async def start_build(preset: Preset, build_manager: BuildManager = Depends(deps.get_build_manager)):
await asyncio.sleep(preset.wait_time)
await build_manager.start("build", f"dflowd build_bot --preset {preset.end_status}") #TODO: Think about making BuildManager and RunManager
await build_manager.start(preset) #TODO: Think about making BuildManager and RunManager
pid = build_manager.get_last_id()
logger.info("Build process '%s' has started", pid)
return {"status": "ok", "pid": pid}


@router.get("/build/stop/{pid}", status_code=200)
async def stop_build(*, pid: int, build_manager: ProcessManager = Depends(deps.get_build_manager)):
async def stop_build(*, pid: int, build_manager: BuildManager = Depends(deps.get_build_manager)):
return _stop_process(pid, build_manager, process="build")


@router.get("/build/status/{pid}", status_code=200)
async def check_build_status(*, pid: int, build_manager: ProcessManager = Depends(deps.get_build_manager)):
async def check_build_status(*, pid: int, build_manager: BuildManager = Depends(deps.get_build_manager)):
return _check_process_status(pid, build_manager)


@router.get("/builds", status_code=200)
async def check_build_processes(build_manager: ProcessManager = Depends(deps.get_build_manager)):
async def check_build_processes(build_manager: BuildManager = Depends(deps.get_build_manager)):
return build_manager.get_min_info()


@router.get("/builds/{build_id}", status_code=200)
async def get_build(*, build_id: int, build_manager: ProcessManager = Depends(deps.get_build_manager)):
async def get_build(*, build_id: int, build_manager: BuildManager = Depends(deps.get_build_manager)):
return build_manager.get_full_info(build_id)


@router.post("/run/start/{build_id}", status_code=201)
async def start_run(*, build_id: int, preset: Preset, run_manager: ProcessManager = Depends(deps.get_run_manager)):
async def start_run(*, build_id: int, preset: Preset, run_manager: RunManager = Depends(deps.get_run_manager)):
await asyncio.sleep(preset.wait_time)
await run_manager.start("run", f"dflowd run_bot {build_id} --preset {preset.end_status}")
await run_manager.start(build_id, preset)
pid = run_manager.get_last_id()
logger.info("Run process '%s' has started", pid)
return {"status": "ok", "pid": pid}


@router.get("/run/stop/{pid}", status_code=200)
async def stop_run(*, pid: int, run_manager: ProcessManager = Depends(deps.get_run_manager)):
async def stop_run(*, pid: int, run_manager: RunManager = Depends(deps.get_run_manager)):
return _stop_process(pid, run_manager, process="run")


@router.get("/run/status/{pid}", status_code=200)
async def check_run_status(*, pid: int, run_manager: ProcessManager = Depends(deps.get_run_manager)):
async def check_run_status(*, pid: int, run_manager: RunManager = Depends(deps.get_run_manager)):
return _check_process_status(pid, run_manager)


@router.get("/runs", status_code=200)
async def check_run_processes(run_manager: ProcessManager = Depends(deps.get_run_manager)):
async def check_run_processes(run_manager: RunManager = Depends(deps.get_run_manager)):
return run_manager.get_min_info()


@router.get("/runs/{run_id}", status_code=200)
async def get_run(*, run_id: int, run_manager: ProcessManager = Depends(deps.get_run_manager)):
async def get_run(*, run_id: int, run_manager: RunManager = Depends(deps.get_run_manager)):
return run_manager.get_full_info(run_id)


@router.websocket("/run/connect")
async def connect(
websocket: WebSocket,
websocket_manager: WebSocketManager = Depends(deps.get_websocket_manager),
run_manager: ProcessManager = Depends(deps.get_run_manager),
run_manager: RunManager = Depends(deps.get_run_manager),
):
"""Open a websocket to connect to a running bot, whose id is 'pid'.
Websocket url should be of format: `/bot/run/connect?pid=<pid>`
Expand Down
10 changes: 5 additions & 5 deletions backend/df_designer/app/api/deps.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from app.clients.process_manager import ProcessManager
from app.clients.process_manager import BuildManager, RunManager
from app.clients.websocket_manager import WebSocketManager

build_manager = ProcessManager()
def get_build_manager() -> ProcessManager:
build_manager = BuildManager()
def get_build_manager() -> BuildManager:
return build_manager

run_manager = ProcessManager()
def get_run_manager() -> ProcessManager:
run_manager = RunManager()
def get_run_manager() -> RunManager:
return run_manager

websocket_manager = WebSocketManager()
Expand Down
67 changes: 40 additions & 27 deletions backend/df_designer/app/clients/process_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,15 @@

from app.core.logger_config import get_logger
from app import BuildProcess, RunProcess
from app.schemas.preset import Preset

logger = get_logger(__name__)


def _get_preset_end_status(command):
if "--preset" in (words:=command.split()):
return words[words.index("--preset")+1]
return "success" #default


class ProcessManager:
def __init__(self):
self.processes = {}

async def start(self, process_type: str, cmd_to_run: str):
id_ = randint(1, 10000) #TODO: change it to incremental counter
if process_type == "build":
process = BuildProcess(id_, _get_preset_end_status(cmd_to_run))
elif process_type == "run":
process = RunProcess(id_, _get_preset_end_status(cmd_to_run))
else: raise ValueError("process_type should be one of [build, run]")
await process.start(cmd_to_run)
self.processes[id_] = process

def get_last_id(self):
"""Get the process_id of the last started process"""
return list(self.processes.keys())[-1]
Expand All @@ -38,21 +23,49 @@ def stop(self, pid):
def check_status(self, pid):
return self.processes[pid].check_status()

def get_full_info(self, id_: int) -> dict:
if id_ in self.processes:
return self.processes[id_].get_full_info()
return {}


class RunManager(ProcessManager):
def __init__(self):
super().__init__()

async def start(self, build_id: int, preset: Preset):
cmd_to_run = f"dflowd run_bot {build_id} --preset {preset.end_status}"
id_ = randint(1, 10000) #TODO: change it to incremental counter
process = RunProcess(id_, preset.end_status)
await process.start(cmd_to_run)
self.processes[id_] = process

def get_min_info(self) -> List[dict]:
build_min_info = ["build_id", "build_preset_name", "status", "timestamp"]
run_min_info = ["run_id", "run_preset_name", "status", "timestamp"]
if isinstance(next(iter(self.processes.values())), RunProcess): #TODO: really doesn't sound clean!
min_info = run_min_info
elif isinstance(next(iter(self.processes.values())), BuildProcess):
min_info = build_min_info
minimum_params = ["run_id", "run_preset_name", "status", "timestamp"]

min_processes_info = []
for _, process in self.processes.items():
process_info = process.get_full_info()
min_processes_info.append({k:v for k,v in process_info.items() if k in min_info})
min_processes_info.append({param:v for param,v in process_info.items() if param in minimum_params})
return min_processes_info

def get_full_info(self, id_: int) -> dict:
if id_ in self.processes:
return self.processes[id_].get_full_info()
return {}

class BuildManager(ProcessManager):
def __init__(self):
super().__init__()

async def start(self, preset: Preset):
cmd_to_run = f"dflowd build_bot --preset {preset.end_status}"
id_ = randint(1, 10000) #TODO: change it to incremental counter
process = BuildProcess(id_, preset.end_status)
await process.start(cmd_to_run)
self.processes[id_] = process

def get_min_info(self) -> List[dict]:
minimum_params = ["build_id", "build_preset_name", "status", "timestamp"]

min_processes_info = []
for _, process in self.processes.items():
process_info = process.get_full_info()
min_processes_info.append({param:v for param,v in process_info.items() if param in minimum_params})
return min_processes_info

0 comments on commit 9f6936d

Please sign in to comment.