diff --git a/changes/2333.feature.md b/changes/2333.feature.md new file mode 100644 index 0000000000..19dc1b17c2 --- /dev/null +++ b/changes/2333.feature.md @@ -0,0 +1 @@ +Implement storage proxy status check CLI command diff --git a/src/ai/backend/storage/cli.py b/src/ai/backend/storage/cli.py index bfdcceecda..1e9d4bf85e 100644 --- a/src/ai/backend/storage/cli.py +++ b/src/ai/backend/storage/cli.py @@ -1,7 +1,106 @@ +import asyncio +import pathlib +import sys +from pprint import pformat + import click +from tabulate import tabulate + +from ai.backend.cli.types import CliContextInfo +from ai.backend.common.exception import ConfigurationError +from ai.backend.common.types import LogSeverity +from ai.backend.storage.config import load_local_config @click.group() def main(): """The root entrypoint for unified CLI of storage-proxy""" pass + + +async def inspect_server_status(cli_ctx: CliContextInfo, storage_proxy_pid: int) -> None: + command = f"ps -p '{storage_proxy_pid}' -f" + process = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE) + stdout, stderr = await process.communicate() + if stderr: + raise RuntimeError(f"Failed to execute the command: {command}") + + lines = stdout.decode().splitlines() + process_list = [] + + for line in lines[1:]: + columns = line.split() + # Combine all text following UID, PID, PPID, C, STIME, TTY, TIME into CMD + process_info = columns[:7] + [" ".join(columns[7:])] + process_list.append(process_info) + + print(tabulate(process_list, headers=lines[0].split(), tablefmt="pretty")) + pass + + +@main.command() +@click.pass_obj +@click.option( + "-f", + "--config-path", + "--config", + type=click.Path( + file_okay=True, + dir_okay=False, + exists=True, + path_type=pathlib.Path, + ), + default=None, + help="The config file path. (default: ./storage-proxy.toml and /etc/backend.ai/storage-proxy.toml)", +) +@click.option( + "--debug", + is_flag=True, + help="Set the logging level to DEBUG", +) +@click.option( + "-s", + "--systemctl", + is_flag=True, + help="Include the systemctl status command result in the output", +) +@click.option( + "--log-level", + type=click.Choice([*LogSeverity], case_sensitive=False), + default=LogSeverity.INFO, + help="Set the logging verbosity level", +) +def status( + cli_ctx: CliContextInfo, + config_path: pathlib.Path, + log_level: LogSeverity, + debug: bool = False, + systemctl: bool = False, +) -> None: + """ + Collect and print each storage-proxy server process's status. + """ + + try: + local_config = load_local_config(config_path, log_level, debug=debug) + except ConfigurationError as e: + print( + "ConfigurationError: Could not read or validate the storage-proxy local config.", + file=sys.stderr, + ) + print(pformat(e.invalid_data), file=sys.stderr) + raise click.Abort() + + pid_filepath = local_config["storage-proxy"]["pid-file"] + + if not pid_filepath.is_file(): + print( + 'ConfigurationError: "pid-file" not found in the configuration file.', + file=sys.stderr, + ) + raise click.Abort() + + with open(pid_filepath, "r") as file: + storage_proxy_pid = int(file.read()) + + asyncio.run(inspect_server_status(cli_ctx, storage_proxy_pid)) diff --git a/src/ai/backend/storage/config.py b/src/ai/backend/storage/config.py index 3c869b466e..c326561f64 100644 --- a/src/ai/backend/storage/config.py +++ b/src/ai/backend/storage/config.py @@ -1,14 +1,11 @@ import os -import sys from pathlib import Path -from pprint import pformat from typing import Any import trafaret as t from ai.backend.common import validators as tx from ai.backend.common.config import ( - ConfigurationError, check, etcd_config_iv, override_key, @@ -17,6 +14,7 @@ ) from ai.backend.common.etcd import AsyncEtcd, ConfigScopes from ai.backend.common.logging import logging_config_iv +from ai.backend.common.types import LogSeverity from .types import VolumeInfo @@ -30,7 +28,7 @@ _default_gid = os.getgid() -local_config_iv = ( +storage_proxy_local_config_iv = ( t.Dict( { t.Key("storage-proxy"): t.Dict( @@ -113,7 +111,9 @@ ) -def load_local_config(config_path: Path | None, debug: bool = False) -> dict[str, Any]: +def load_local_config( + config_path: Path | None, log_level: LogSeverity, debug: bool = False +) -> dict[str, Any]: # Determine where to read configuration. raw_cfg, cfg_src_path = read_from_file(config_path, "storage-proxy") os.chdir(cfg_src_path.parent) @@ -125,17 +125,13 @@ def load_local_config(config_path: Path | None, debug: bool = False) -> dict[str if debug: override_key(raw_cfg, ("debug", "enabled"), True) - try: - local_config = check(raw_cfg, local_config_iv) - local_config["_src"] = cfg_src_path - return local_config - except ConfigurationError as e: - print( - "ConfigurationError: Validation of storage-proxy local config has failed:", - file=sys.stderr, - ) - print(pformat(e.invalid_data), file=sys.stderr) - raise + override_key(raw_cfg, ("debug", "enabled"), log_level == LogSeverity.DEBUG) + override_key(raw_cfg, ("logging", "level"), log_level) + override_key(raw_cfg, ("logging", "pkg-ns", "ai.backend"), log_level) + + local_config = check(raw_cfg, storage_proxy_local_config_iv) + local_config["_src"] = cfg_src_path + return local_config def load_shared_config(local_config: dict[str, Any]) -> AsyncEtcd: diff --git a/src/ai/backend/storage/migration.py b/src/ai/backend/storage/migration.py index feee14c332..bd7d43d80f 100644 --- a/src/ai/backend/storage/migration.py +++ b/src/ai/backend/storage/migration.py @@ -25,6 +25,7 @@ EventProducer, ) from ai.backend.common.logging import BraceStyleAdapter, Logger +from ai.backend.common.types import LogSeverity from .abc import CAP_FAST_SIZE, AbstractVolume from .config import load_local_config, load_shared_config @@ -324,7 +325,7 @@ def main( Print migration script to OUTFILE. Pass - as OUTFILE to print results to STDOUT. """ - local_config = load_local_config(config_path, debug=debug) + local_config = load_local_config(config_path, LogSeverity.DEBUG, debug=debug) ipc_base_path = local_config["storage-proxy"]["ipc-base-path"] log_sockpath = Path( ipc_base_path / f"storage-proxy-logger-{os.getpid()}.sock", diff --git a/src/ai/backend/storage/server.py b/src/ai/backend/storage/server.py index 7d4d84d4ec..dec194255e 100644 --- a/src/ai/backend/storage/server.py +++ b/src/ai/backend/storage/server.py @@ -20,7 +20,6 @@ from ai.backend.common.config import ( ConfigurationError, - override_key, redis_config_iv, ) from ai.backend.common.defs import REDIS_STREAM_DB @@ -252,7 +251,7 @@ def main( ) -> int: """Start the storage-proxy service as a foreground process.""" try: - local_config = load_local_config(config_path, debug=debug) + local_config = load_local_config(config_path, log_level, debug=debug) except ConfigurationError as e: print( "ConfigurationError: Could not read or validate the storage-proxy local config:", @@ -262,10 +261,6 @@ def main( raise click.Abort() if debug: log_level = LogSeverity.DEBUG - override_key(local_config, ("debug", "enabled"), log_level == LogSeverity.DEBUG) - override_key(local_config, ("logging", "level"), log_level) - override_key(local_config, ("logging", "pkg-ns", "ai.backend"), log_level) - multiprocessing.set_start_method("spawn") if cli_ctx.invoked_subcommand is None: diff --git a/tests/storage-proxy/conftest.py b/tests/storage-proxy/conftest.py index a8acae6f52..4b7735587e 100644 --- a/tests/storage-proxy/conftest.py +++ b/tests/storage-proxy/conftest.py @@ -10,7 +10,7 @@ from ai.backend.common.etcd import AsyncEtcd, ConfigScopes from ai.backend.common.exception import ConfigurationError -from ai.backend.common.types import HostPortPair, QuotaScopeID, QuotaScopeType +from ai.backend.common.types import HostPortPair, LogSeverity, QuotaScopeID, QuotaScopeType from ai.backend.storage.abc import AbstractVolume from ai.backend.storage.config import load_local_config from ai.backend.storage.types import VFolderID @@ -47,7 +47,7 @@ def mock_etcd() -> Iterator[AsyncEtcd]: def has_backend(backend_name: str) -> dict[str, Any] | None: try: - local_config = load_local_config(None, debug=True) + local_config = load_local_config(None, LogSeverity.DEBUG, debug=True) except ConfigurationError: return None for _, info in local_config["volume"].items():