Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement storage proxy status check CLI command #2333

Draft
wants to merge 5 commits into
base: topic/06-24-feat_implement_agent_status_check_cli_command
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/2333.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement storage proxy status check CLI command
99 changes: 99 additions & 0 deletions src/ai/backend/storage/cli.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,106 @@
import asyncio
import pathlib
import sys
from pprint import pformat

import click
from tabulate import tabulate

from ai.backend.cli.types import CliContextInfo
from ai.backend.common.exception import ConfigurationError
from ai.backend.common.types import LogSeverity
from ai.backend.storage.config import load_local_config


@click.group()
def main():
"""The root entrypoint for unified CLI of storage-proxy"""
pass


async def inspect_server_status(cli_ctx: CliContextInfo, storage_proxy_pid: int) -> None:
command = f"ps -p '{storage_proxy_pid}' -f"
process = await asyncio.create_subprocess_shell(command, stdout=asyncio.subprocess.PIPE)
stdout, stderr = await process.communicate()
if stderr:
raise RuntimeError(f"Failed to execute the command: {command}")

lines = stdout.decode().splitlines()
process_list = []

for line in lines[1:]:
columns = line.split()
# Combine all text following UID, PID, PPID, C, STIME, TTY, TIME into CMD
process_info = columns[:7] + [" ".join(columns[7:])]
process_list.append(process_info)

print(tabulate(process_list, headers=lines[0].split(), tablefmt="pretty"))
pass


@main.command()
@click.pass_obj
@click.option(
"-f",
"--config-path",
"--config",
type=click.Path(
file_okay=True,
dir_okay=False,
exists=True,
path_type=pathlib.Path,
),
default=None,
help="The config file path. (default: ./storage-proxy.toml and /etc/backend.ai/storage-proxy.toml)",
)
@click.option(
"--debug",
is_flag=True,
help="Set the logging level to DEBUG",
)
@click.option(
"-s",
"--systemctl",
is_flag=True,
help="Include the systemctl status command result in the output",
)
@click.option(
"--log-level",
type=click.Choice([*LogSeverity], case_sensitive=False),
default=LogSeverity.INFO,
help="Set the logging verbosity level",
)
def status(
cli_ctx: CliContextInfo,
config_path: pathlib.Path,
log_level: LogSeverity,
debug: bool = False,
systemctl: bool = False,
) -> None:
"""
Collect and print each storage-proxy server process's status.
"""

try:
local_config = load_local_config(config_path, log_level, debug=debug)
except ConfigurationError as e:
print(
"ConfigurationError: Could not read or validate the storage-proxy local config.",
file=sys.stderr,
)
print(pformat(e.invalid_data), file=sys.stderr)
raise click.Abort()

pid_filepath = local_config["storage-proxy"]["pid-file"]

if not pid_filepath.is_file():
print(
'ConfigurationError: "pid-file" not found in the configuration file.',
file=sys.stderr,
)
raise click.Abort()

with open(pid_filepath, "r") as file:
storage_proxy_pid = int(file.read())

asyncio.run(inspect_server_status(cli_ctx, storage_proxy_pid))
28 changes: 12 additions & 16 deletions src/ai/backend/storage/config.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import os
import sys
from pathlib import Path
from pprint import pformat
from typing import Any

import trafaret as t

from ai.backend.common import validators as tx
from ai.backend.common.config import (
ConfigurationError,
check,
etcd_config_iv,
override_key,
Expand All @@ -17,6 +14,7 @@
)
from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.logging import logging_config_iv
from ai.backend.common.types import LogSeverity

from .types import VolumeInfo

Expand All @@ -30,7 +28,7 @@
_default_gid = os.getgid()


local_config_iv = (
storage_proxy_local_config_iv = (
t.Dict(
{
t.Key("storage-proxy"): t.Dict(
Expand Down Expand Up @@ -113,7 +111,9 @@
)


def load_local_config(config_path: Path | None, debug: bool = False) -> dict[str, Any]:
def load_local_config(
config_path: Path | None, log_level: LogSeverity, debug: bool = False
) -> dict[str, Any]:
# Determine where to read configuration.
raw_cfg, cfg_src_path = read_from_file(config_path, "storage-proxy")
os.chdir(cfg_src_path.parent)
Expand All @@ -125,17 +125,13 @@ def load_local_config(config_path: Path | None, debug: bool = False) -> dict[str
if debug:
override_key(raw_cfg, ("debug", "enabled"), True)

try:
local_config = check(raw_cfg, local_config_iv)
local_config["_src"] = cfg_src_path
return local_config
except ConfigurationError as e:
print(
"ConfigurationError: Validation of storage-proxy local config has failed:",
file=sys.stderr,
)
print(pformat(e.invalid_data), file=sys.stderr)
raise
override_key(raw_cfg, ("debug", "enabled"), log_level == LogSeverity.DEBUG)
override_key(raw_cfg, ("logging", "level"), log_level)
override_key(raw_cfg, ("logging", "pkg-ns", "ai.backend"), log_level)

local_config = check(raw_cfg, storage_proxy_local_config_iv)
local_config["_src"] = cfg_src_path
return local_config


def load_shared_config(local_config: dict[str, Any]) -> AsyncEtcd:
Expand Down
3 changes: 2 additions & 1 deletion src/ai/backend/storage/migration.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
EventProducer,
)
from ai.backend.common.logging import BraceStyleAdapter, Logger
from ai.backend.common.types import LogSeverity

from .abc import CAP_FAST_SIZE, AbstractVolume
from .config import load_local_config, load_shared_config
Expand Down Expand Up @@ -324,7 +325,7 @@ def main(
Print migration script to OUTFILE.
Pass - as OUTFILE to print results to STDOUT.
"""
local_config = load_local_config(config_path, debug=debug)
local_config = load_local_config(config_path, LogSeverity.DEBUG, debug=debug)
ipc_base_path = local_config["storage-proxy"]["ipc-base-path"]
log_sockpath = Path(
ipc_base_path / f"storage-proxy-logger-{os.getpid()}.sock",
Expand Down
7 changes: 1 addition & 6 deletions src/ai/backend/storage/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

from ai.backend.common.config import (
ConfigurationError,
override_key,
redis_config_iv,
)
from ai.backend.common.defs import REDIS_STREAM_DB
Expand Down Expand Up @@ -252,7 +251,7 @@ def main(
) -> int:
"""Start the storage-proxy service as a foreground process."""
try:
local_config = load_local_config(config_path, debug=debug)
local_config = load_local_config(config_path, log_level, debug=debug)
except ConfigurationError as e:
print(
"ConfigurationError: Could not read or validate the storage-proxy local config:",
Expand All @@ -262,10 +261,6 @@ def main(
raise click.Abort()
if debug:
log_level = LogSeverity.DEBUG
override_key(local_config, ("debug", "enabled"), log_level == LogSeverity.DEBUG)
override_key(local_config, ("logging", "level"), log_level)
override_key(local_config, ("logging", "pkg-ns", "ai.backend"), log_level)

multiprocessing.set_start_method("spawn")

if cli_ctx.invoked_subcommand is None:
Expand Down
4 changes: 2 additions & 2 deletions tests/storage-proxy/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ai.backend.common.etcd import AsyncEtcd, ConfigScopes
from ai.backend.common.exception import ConfigurationError
from ai.backend.common.types import HostPortPair, QuotaScopeID, QuotaScopeType
from ai.backend.common.types import HostPortPair, LogSeverity, QuotaScopeID, QuotaScopeType
from ai.backend.storage.abc import AbstractVolume
from ai.backend.storage.config import load_local_config
from ai.backend.storage.types import VFolderID
Expand Down Expand Up @@ -47,7 +47,7 @@ def mock_etcd() -> Iterator[AsyncEtcd]:

def has_backend(backend_name: str) -> dict[str, Any] | None:
try:
local_config = load_local_config(None, debug=True)
local_config = load_local_config(None, LogSeverity.DEBUG, debug=True)
except ConfigurationError:
return None
for _, info in local_config["volume"].items():
Expand Down
Loading