diff --git a/src/ai/backend/client/cli/session/lifecycle.py b/src/ai/backend/client/cli/session/lifecycle.py index a7a053f4bdf..9a6666c925c 100644 --- a/src/ai/backend/client/cli/session/lifecycle.py +++ b/src/ai/backend/client/cli/session/lifecycle.py @@ -28,7 +28,6 @@ from ai.backend.client.cli.types import CLIContext from ai.backend.common.arch import DEFAULT_IMAGE_ARCH from ai.backend.common.types import ClusterMode -from ai.backend.common.utils import get_first_timestamp_for_status from ...compat import asyncio_run from ...exceptions import BackendAPIError @@ -36,6 +35,7 @@ from ...output.fields import session_fields from ...output.types import FieldSpec from ...session import AsyncSession, Session +from ...utils import get_first_timestamp_for_status from .. import events from ..pretty import ( ProgressViewer, diff --git a/src/ai/backend/client/utils.py b/src/ai/backend/client/utils.py index b95fc5c9b88..6561eacedf8 100644 --- a/src/ai/backend/client/utils.py +++ b/src/ai/backend/client/utils.py @@ -1,6 +1,10 @@ +from __future__ import annotations + import io import os +from datetime import datetime +from dateutil.parser import parse as dtparse from tqdm import tqdm @@ -48,3 +52,13 @@ def readinto1(self, *args, **kwargs): count = super().readinto1(*args, **kwargs) self.tqdm.set_postfix(file=self._filename, refresh=False) self.tqdm.update(count) + + +def get_first_timestamp_for_status( + status_history: list[dict[str, str]], + status: str, +) -> datetime | None: + for rec in status_history: + if rec["status"] == status: + return dtparse(rec["timestamp"]) + return None diff --git a/src/ai/backend/common/utils.py b/src/ai/backend/common/utils.py index acadeb48d79..34b74cc684d 100644 --- a/src/ai/backend/common/utils.py +++ b/src/ai/backend/common/utils.py @@ -8,7 +8,7 @@ import sys import uuid from collections import OrderedDict -from datetime import datetime, timedelta +from datetime import timedelta from itertools import chain from pathlib import Path from typing import ( @@ -25,7 +25,6 @@ import aiofiles from async_timeout import timeout -from dateutil.parser import parse as dtparse if TYPE_CHECKING: from decimal import Decimal @@ -405,16 +404,3 @@ async def umount( fstab = Fstab(fp) await fstab.remove_by_mountpoint(str(mountpoint)) return True - - -def get_first_timestamp_for_status( - status_history_records: list[dict[str, str]], status: str -) -> datetime | None: - """ - Get the first occurrence time of the given status from the status history records. - """ - - for status_history in status_history_records: - if status_history["status"] == status: - return dtparse(status_history["timestamp"]) - return None diff --git a/src/ai/backend/manager/models/kernel.py b/src/ai/backend/manager/models/kernel.py index 4734a15a1de..2e009127cb6 100644 --- a/src/ai/backend/manager/models/kernel.py +++ b/src/ai/backend/manager/models/kernel.py @@ -46,7 +46,6 @@ SessionTypes, VFolderMount, ) -from ai.backend.common.utils import get_first_timestamp_for_status from ..api.exceptions import ( BackendError, @@ -83,7 +82,9 @@ ExtendedAsyncSAEngine, JSONCoalesceExpr, execute_with_retry, + get_first_timestamp_for_status, sql_append_dict_to_list, + sql_json_merge, ) if TYPE_CHECKING: @@ -927,8 +928,8 @@ def parse_row(cls, ctx: GraphQueryContext, row: KernelRow) -> Mapping[str, Any]: hide_agents = False else: hide_agents = ctx.local_config["manager"]["hide-agents"] - status_history = row.status_history - scheduled_at = get_first_timestamp_for_status(status_history, KernelStatus.SCHEDULED.name) + status_history = cast(list[dict[str, str]], row.status_history) + scheduled_at = get_first_timestamp_for_status(status_history, KernelStatus.SCHEDULED) return { # identity diff --git a/src/ai/backend/manager/models/resource_usage.py b/src/ai/backend/manager/models/resource_usage.py index 76b33016fe9..0916cadf3c5 100644 --- a/src/ai/backend/manager/models/resource_usage.py +++ b/src/ai/backend/manager/models/resource_usage.py @@ -1,8 +1,9 @@ from __future__ import annotations +import json from datetime import datetime from enum import Enum -from typing import Any, Mapping, Optional, Sequence +from typing import Any, Mapping, Optional, Sequence, cast from uuid import UUID import attrs @@ -14,14 +15,15 @@ from sqlalchemy.orm import joinedload, load_only from ai.backend.common import redis_helper +from ai.backend.common.json import ExtendedJSONEncoder from ai.backend.common.types import RedisConnectionInfo -from ai.backend.common.utils import get_first_timestamp_for_status, nmget +from ai.backend.common.utils import nmget from .group import GroupRow from .kernel import LIVE_STATUS, RESOURCE_USAGE_KERNEL_STATUSES, KernelRow, KernelStatus from .session import SessionRow from .user import UserRow -from .utils import ExtendedAsyncSAEngine +from .utils import ExtendedAsyncSAEngine, get_first_timestamp_for_status __all__: Sequence[str] = ( "ResourceGroupUnit", @@ -517,7 +519,9 @@ async def _pipe_builder(r: Redis) -> RedisPipeline: created_at=kern.created_at, terminated_at=kern.terminated_at, scheduled_at=str( - get_first_timestamp_for_status(kern.status_history, KernelStatus.SCHEDULED.name) + get_first_timestamp_for_status( + cast(list[dict[str, str]], kern.status_history), KernelStatus.SCHEDULED + ) ), used_time=kern.used_time, used_days=kern.get_used_days(local_tz), @@ -536,7 +540,7 @@ async def _pipe_builder(r: Redis) -> RedisPipeline: images={kern.image}, agents={kern.agent}, status=kern.status.name, - status_history=kern.status_history, + status_history=json.dumps(kern.status_history, cls=ExtendedJSONEncoder), cluster_mode=kern.cluster_mode, status_info=kern.status_info, group_unit=ResourceGroupUnit.KERNEL, diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index 53ba67a665b..8cbdd06afb9 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -38,7 +38,6 @@ SessionTypes, VFolderMount, ) -from ai.backend.common.utils import get_first_timestamp_for_status from ..api.exceptions import ( AgentError, @@ -80,7 +79,9 @@ JSONCoalesceExpr, agg_to_array, execute_with_retry, + get_first_timestamp_for_status, sql_append_dict_to_list, + sql_json_merge, ) if TYPE_CHECKING: @@ -1324,9 +1325,7 @@ def parse_row(cls, ctx: GraphQueryContext, row: Row) -> Mapping[str, Any]: full_name = getattr(row, "full_name") group_name = getattr(row, "group_name") row = row.SessionRow - scheduled_at = get_first_timestamp_for_status( - row.status_history, SessionStatus.SCHEDULED.name - ) + scheduled_at = get_first_timestamp_for_status(row.status_history, SessionStatus.SCHEDULED) return { # identity diff --git a/src/ai/backend/manager/models/utils.py b/src/ai/backend/manager/models/utils.py index 7093ac876cc..77565762639 100644 --- a/src/ai/backend/manager/models/utils.py +++ b/src/ai/backend/manager/models/utils.py @@ -6,6 +6,7 @@ import logging from contextlib import AbstractAsyncContextManager as AbstractAsyncCtxMgr from contextlib import asynccontextmanager as actxmgr +from datetime import datetime from typing import ( TYPE_CHECKING, Any, @@ -23,6 +24,7 @@ from urllib.parse import quote_plus as urlquote import sqlalchemy as sa +from dateutil.parser import parse as dtparse from sqlalchemy.dialects import postgresql as psql from sqlalchemy.engine import create_engine as _create_engine from sqlalchemy.exc import DBAPIError @@ -44,6 +46,10 @@ if TYPE_CHECKING: from ..config import LocalConfig + from . import ( + KernelStatus, + SessionStatus, + ) from ..defs import LockID from ..types import Sentinel @@ -536,3 +542,17 @@ async def vacuum_db( vacuum_sql = "VACUUM FULL" if vacuum_full else "VACUUM" log.info(f"Perfoming {vacuum_sql} operation...") await conn.exec_driver_sql(vacuum_sql) + + +def get_first_timestamp_for_status( + status_history_records: list[dict[str, str]], + status: KernelStatus | SessionStatus, +) -> datetime | None: + """ + Get the first occurrence time of the given status from the status history records. + """ + + for status_history in status_history_records: + if status_history["status"] == status.name: + return dtparse(status_history["timestamp"]) + return None diff --git a/src/ai/backend/manager/utils.py b/src/ai/backend/manager/utils.py index 091b9a767cb..8a4c070a9bf 100644 --- a/src/ai/backend/manager/utils.py +++ b/src/ai/backend/manager/utils.py @@ -1,3 +1,5 @@ +from __future__ import annotations + from typing import Optional from uuid import UUID