You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
"""Web API interface for system control.This module provides a simple web API for controlling the system, allowingexternal applications to pause, resume, shutdown the system and save checkpoints."""importjsonimportloggingimportthreadingfromenumimportEnum, autofromqueueimportEmpty, QueuefromtypingimportAnyimportbottlefrombottleimportBottle, request, responsefrompamiq_core.threadsimportThreadTypes# Constants for commonly used responses_RESULT_OK= {"result": "ok"}
_ERROR_INVALID_ENDPOINT= {"error": "Invalid API endpoint"}
_ERROR_INVALID_METHOD= {"error": "Invalid API method"}
_ERROR_INTERNAL_SERVER= {"error": "Internal server error"}
# Type aliasesPayloadType=dict[str, str]
classControlCommands(Enum):
"""Enumerates the commands for the system control."""SHUTDOWN=auto()
PAUSE=auto()
RESUME=auto()
SAVE_CHECKPOINT=auto()
classWebApiHandler:
"""Web API handler for controlling the system. This class provides a simple Web API for controlling the thread controller, allowing external applications to pause, resume, and shutdown the system. """def__init__(
self,
system_status: SystemStatus,
host: str="localhost",
port: int=8391,
) ->None:
"""Initialize the WebApiHandler. Args: controller_status: To watch the thread controller status. host: Hostname to run the API server on. port: Port to run the API server on. """self._logger=logging.getLogger(f"{__name__}.{self.__class__.__name__}")
self._system_status=system_statusself._host=hostself._port=port# Create a dedicated Bottle app instead of using the default appself._app=Bottle()
self._register_handlers()
self._handler_thread=threading.Thread(
target=self.run,
daemon=True,
name=f"{ThreadTypes.CONTROL.thread_name}_webapi"
)
self._received_commands_queue: Queue[ControlCommands] =Queue()
defrun(self) ->None:
"""Run the API server."""failed=Truewhilefailed:
try:
self._logger.info(f"Serving system command at '{self._host}:{self._port}'")
self._app.run(host=self._host, port=self._port, quiet=True)
failed=FalseexceptOSError:
self._logger.info(f"Address '{self._host}:{self._port}' is already used, increment port number...")
self._port+=1exceptException:
self._logger.exception("Error starting API server")
raisedefrun_in_background(self) ->None:
"""Run the API server in a background thread."""self._handler_thread.start()
defhas_commands(self) ->bool:
"""Check if there are commands in the queue. Returns: True if there are commands, False otherwise. """returnnotself._received_commands_queue.empty()
defreceive_command(self) ->ControlCommands:
"""Receive a command from the queue without blocking. Returns: The command from the queue. Raises: Empty: If there are no commands in the queue. """returnself._received_commands_queue.get_nowait()
def_register_handlers(self) ->None:
"""Register API handlers with bottle."""# Use the app's route decorator instead of the global oneself._app.route("/api/status", method="GET", callback=self._get_status)
self._app.route("/api/pause", method="POST", callback=self._post_pause)
self._app.route("/api/resume", method="POST", callback=self._post_resume)
self._app.route("/api/shutdown", method="POST", callback=self._post_shutdown)
self._app.route("/api/save-checkpoint", method="POST", callback=self._post_save_checkpoint)
# Register error handlersself._app.error(404)(self._error_404)
self._app.error(405)(self._error_405)
self._app.error(500)(self._error_500)
def_json_response(self, data: dict[str, Any]) ->str:
"""Helper method to return JSON responses. Args: data: The data to convert to JSON. Returns: JSON string representation of the data. """response.content_type="application/json"returnjson.dumps(data)
def_get_status(self) ->str:
"""Handle GET /api/status request. Returns: JSON payload with the current system status. """status: strifself._system_status.is_shutdown():
status="stopped"elifself._system_status.is_paused():
status="paused"else:
status="active"self._logger.info(f"Status request: returning {status}")
returnself._json_response({"status": status})
def_post_pause(self) ->str:
"""Handle POST /api/pause request. Returns: JSON payload with the result. """self._logger.info("Pause command received")
self._received_commands_queue.put(ControlCommands.PAUSE)
returnself._json_response(_RESULT_OK)
def_post_resume(self) ->str:
"""Handle POST /api/resume request. Returns: JSON payload with the result. """self._logger.info("Resume command received")
self._received_commands_queue.put(ControlCommands.RESUME)
returnself._json_response(_RESULT_OK)
def_post_shutdown(self) ->str:
"""Handle POST /api/shutdown request. Returns: JSON payload with the result. """self._logger.info("Shutdown command received")
self._received_commands_queue.put(ControlCommands.SHUTDOWN)
returnself._json_response(_RESULT_OK)
def_post_save_checkpoint(self) ->str:
"""Handle POST /api/save-checkpoint request. Returns: JSON payload with the result. """self._logger.info("Save checkpoint command received")
self._received_commands_queue.put(ControlCommands.SAVE_CHECKPOINT)
returnself._json_response(_RESULT_OK)
def_error_404(self, error: bottle.HTTPError) ->str:
"""Handle 404 errors. Args: error: The HTTP error. Returns: JSON payload with the error message. """self._logger.error(f"404: {request.method}{request.path} is invalid API endpoint")
returnself._json_response(_ERROR_INVALID_ENDPOINT)
def_error_405(self, error: bottle.HTTPError) ->str:
"""Handle 405 errors. Args: error: The HTTP error. Returns: JSON payload with the error message. """self._logger.error(f"405: {request.method}{request.path} is invalid API method")
returnself._json_response(_ERROR_INVALID_METHOD)
def_error_500(self, error: bottle.HTTPError) ->str:
"""Handle 500 errors. Args: error: The HTTP error. Returns: JSON payload with the error message. """self._logger.error(f"500: {request.method}{request.path} caused an error")
returnself._json_response(_ERROR_INTERNAL_SERVER)
達成条件
WebApiHandlerが実装された
テスト
The text was updated successfully, but these errors were encountered:
タスク内容
AMI Systemの WebApiHandlerクラスをほぼそのまま移植します。
https://github.com/MLShukai/ami/blob/main/ami/threads/web_api_handler.py
スケッチ
SystemStatus
に関しては、どのように実装するか検討中です。達成条件
The text was updated successfully, but these errors were encountered: