Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace sessions, kernels's status_history's type map with list #1662

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f174192
Change status_history from map to list
jopemachine Oct 27, 2023
d34516b
Fix test
jopemachine Oct 29, 2023
013d090
Try to fix CI
jopemachine Oct 30, 2023
776cfea
Fix test
jopemachine Oct 30, 2023
a7c7d8b
Rename `sql_list_append` -> `sql_append_lists_to_list`
jopemachine Oct 30, 2023
ff60d14
Add fragment
jopemachine Oct 30, 2023
466819d
Fix wrong comment position
jopemachine Oct 30, 2023
02e2509
Rename get_first_status_history_record function
jopemachine Oct 30, 2023
637b479
Fix wrong implementation of session_history of vfolder
jopemachine Oct 30, 2023
e005f7a
Add default value to session status history
jopemachine Oct 30, 2023
8f0126f
Code organization
jopemachine Oct 30, 2023
e7388f9
Rename get_first_occurrence_time function
jopemachine Oct 30, 2023
7645e9f
Add comments for status_history column
jopemachine Oct 30, 2023
9a9a536
Try to fix _fetch_hanging_sessions
jopemachine Oct 30, 2023
d622cd3
Fix broken _fetch_hanging_sessions
jopemachine Oct 30, 2023
0360541
Remove useless newline
jopemachine Oct 30, 2023
915f900
Add migration script
jopemachine Oct 30, 2023
b679db5
Change status history format
jopemachine Nov 2, 2023
6e457bb
Add FieldSpec for status_history
jopemachine Nov 3, 2023
78b6f1d
Fix status_history command
jopemachine Nov 3, 2023
daf43af
Update obsoleted comments
jopemachine Nov 6, 2023
e4f9dd3
Update migration script
jopemachine Nov 6, 2023
a12a076
Update _fetch_hanging_sessions SQL
jopemachine Nov 6, 2023
b952c41
Allow to search stale sessions
jopemachine Nov 6, 2023
36160bf
Resolve alembic conflict
jopemachine Jan 26, 2024
bdfd636
chore: Merge with main
jopemachine May 3, 2024
9b83042
chore: update GraphQL schema dump
jopemachine May 3, 2024
bd7b318
fix: revert unrelated change
jopemachine May 3, 2024
956660d
docs: update migration script msg
jopemachine May 3, 2024
24f425a
chore: Add description field
jopemachine May 3, 2024
13121a3
chore: update GraphQL schema dump
jopemachine May 3, 2024
88529a2
fix: alembic migration script down_revision
jopemachine May 3, 2024
7d1e429
fix: Wrong handling of `get_first_occurrence_time`
jopemachine May 3, 2024
340e59d
chore: Rename `get_first_occurrence_time` -> `get_first_timestamp_for…
jopemachine May 3, 2024
b85bbbb
docs: Update fragment
jopemachine May 6, 2024
34e82c7
docs: Add `deprecation_reason` to `status_history` in gql
jopemachine May 6, 2024
971a30e
chore: update GraphQL schema dump
jopemachine May 6, 2024
79ccd88
fix: Mishandling of `scheduled_at` in gql
jopemachine May 7, 2024
cf8a6d0
fix: Change `get_first_timestamp_for_status` return type to datetime
jopemachine May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/1662.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change the type of `status_history` from a mapping of status and timestamps to a list of log entries containing status and timestamps, to preserve timestamps when revisiting session/kernel statuses (e.g., after session restarts).
57 changes: 31 additions & 26 deletions src/ai/backend/client/cli/session/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import inquirer
import treelib
from async_timeout import timeout
from dateutil.parser import isoparse
from dateutil.tz import tzutc
from faker import Faker
from humanize import naturalsize
Expand All @@ -25,8 +24,11 @@
from ai.backend.cli.main import main
from ai.backend.cli.params import CommaSeparatedListType, OptionalType
from ai.backend.cli.types import ExitCode, Undefined, undefined
from ai.backend.client.cli.extensions import pass_ctx_obj
from ai.backend.client.cli.types import CLIContext
from ai.backend.common.arch import DEFAULT_IMAGE_ARCH
from ai.backend.common.types import ClusterMode
from ai.backend.common.utils import get_first_timestamp_for_status

from ...compat import asyncio_run
from ...exceptions import BackendAPIError
Expand Down Expand Up @@ -763,8 +765,9 @@ def logs(session_id):


@session.command("status-history")
@pass_ctx_obj
@click.argument("session_id", metavar="SESSID")
def status_history(session_id):
def status_history(ctx: CLIContext, session_id):
"""
Shows the status transition history of the compute session.

Expand All @@ -776,31 +779,33 @@ def status_history(session_id):
kernel = session.ComputeSession(session_id)
try:
status_history = kernel.get_status_history().get("result")
print_info(f"status_history: {status_history}")
if (preparing := status_history.get("preparing")) is None:
result = {
"result": {
"seconds": 0,
"microseconds": 0,
},
}
elif (terminated := status_history.get("terminated")) is None:
alloc_time_until_now: timedelta = datetime.now(tzutc()) - isoparse(preparing)
result = {
"result": {
"seconds": alloc_time_until_now.seconds,
"microseconds": alloc_time_until_now.microseconds,
},
}

prev_time = None

for status_record in status_history:
timestamp = datetime.fromisoformat(status_record["timestamp"])

if prev_time:
time_diff = timestamp - prev_time
status_record["time_elapsed"] = str(time_diff)

prev_time = timestamp

ctx.output.print_list(
status_history,
[FieldSpec("status"), FieldSpec("timestamp"), FieldSpec("time_elapsed")],
)

if (preparing := get_first_timestamp_for_status(status_history, "PREPARING")) is None:
elapsed = timedelta()
elif (
terminated := get_first_timestamp_for_status(status_history, "TERMINATED")
) is None:
elapsed = datetime.now(tzutc()) - preparing
else:
alloc_time: timedelta = isoparse(terminated) - isoparse(preparing)
result = {
"result": {
"seconds": alloc_time.seconds,
"microseconds": alloc_time.microseconds,
},
}
print_done(f"Actual Resource Allocation Time: {result}")
elapsed = terminated - preparing

print_done(f"Actual Resource Allocation Time: {elapsed.total_seconds()}")
except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/client/output/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@
FieldSpec("created_user_id"),
FieldSpec("status"),
FieldSpec("status_info"),
FieldSpec("status_history"),
FieldSpec("status_history_log"),
FieldSpec("status_data", formatter=nested_dict_formatter),
FieldSpec("status_changed", "Last Updated"),
FieldSpec("created_at"),
Expand Down
16 changes: 15 additions & 1 deletion src/ai/backend/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import sys
import uuid
from collections import OrderedDict
from datetime import timedelta
from datetime import datetime, timedelta
from itertools import chain
from pathlib import Path
from typing import (
Expand All @@ -25,6 +25,7 @@

import aiofiles
from async_timeout import timeout
from dateutil.parser import parse as dtparse

if TYPE_CHECKING:
from decimal import Decimal
Expand Down Expand Up @@ -404,3 +405,16 @@ async def umount(
fstab = Fstab(fp)
await fstab.remove_by_mountpoint(str(mountpoint))
return True


def get_first_timestamp_for_status(
status_history_records: list[dict[str, str]], status: str
) -> datetime | None:
"""
Get the first occurrence time of the given status from the status history records.
"""

for status_history in status_history_records:
if status_history["status"] == status:
return dtparse(status_history["timestamp"])
return None
2 changes: 1 addition & 1 deletion src/ai/backend/manager/api/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ async def _pipe_builder(r: Redis) -> RedisPipeline:
"status": row["status"].name,
"status_info": row["status_info"],
"status_changed": str(row["status_changed"]),
"status_history": row["status_history"] or {},
"status_history": row["status_history"],
"cluster_mode": row["cluster_mode"],
}
if group_id not in objs_per_group:
Expand Down
5 changes: 4 additions & 1 deletion src/ai/backend/manager/api/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,10 @@
status_changed: DateTime
status_info: String
status_data: JSONString
status_history: JSONString
status_history: JSONString @deprecated(reason: "Deprecated since 24.09.0; use `status_history_log`")

Check notice on line 548 in src/ai/backend/manager/api/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Field 'ComputeSession.status_history' is deprecated

Field 'ComputeSession.status_history' is deprecated

Check notice on line 548 in src/ai/backend/manager/api/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Field 'ComputeSession.status_history' has deprecation reason 'Deprecated since 24.09.0; use `status_history_log`'

Field 'ComputeSession.status_history' has deprecation reason 'Deprecated since 24.09.0; use `status_history_log`'

"""Added in 24.09.0"""
status_history_log: JSONString

Check notice on line 551 in src/ai/backend/manager/api/schema.graphql

View workflow job for this annotation

GitHub Actions / GraphQL Inspector

Field 'status_history_log' was added to object type 'ComputeSession'

Field 'status_history_log' was added to object type 'ComputeSession'
created_at: DateTime
terminated_at: DateTime
starts_at: DateTime
Expand Down
31 changes: 31 additions & 0 deletions src/ai/backend/manager/api/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,36 @@ async def get_container_logs(request: web.Request, params: Any) -> web.Response:
return web.json_response(resp, status=200)


@server_status_required(READ_ALLOWED)
@auth_required
@check_api_params(
t.Dict({
tx.AliasedKey(["session_name", "sessionName", "task_id", "taskId"]) >> "kernel_id": tx.UUID,
t.Key("owner_access_key", default=None): t.Null | t.String,
})
)
async def get_status_history(request: web.Request, params: Any) -> web.Response:
root_ctx: RootContext = request.app["_root.context"]
session_name: str = request.match_info["session_name"]
requester_access_key, owner_access_key = await get_access_key_scopes(request, params)
log.info(
"GET_STATUS_HISTORY (ak:{}/{}, s:{})", requester_access_key, owner_access_key, session_name
)
resp: dict[str, Mapping] = {"result": {}}

async with root_ctx.db.begin_readonly_session() as db_sess:
compute_session = await SessionRow.get_session(
db_sess,
session_name,
owner_access_key,
allow_stale=True,
kernel_loading_strategy=KernelLoadingStrategy.MAIN_KERNEL_ONLY,
)
resp["result"] = compute_session.status_history

return web.json_response(resp, status=200)


@server_status_required(READ_ALLOWED)
@auth_required
@check_api_params(
Expand Down Expand Up @@ -2286,6 +2316,7 @@ def create_app(
app.router.add_route("GET", "/{session_name}/direct-access-info", get_direct_access_info)
)
cors.add(app.router.add_route("GET", "/{session_name}/logs", get_container_logs))
cors.add(app.router.add_route("GET", "/{session_name}/status-history", get_status_history))
cors.add(app.router.add_route("POST", "/{session_name}/rename", rename_session))
cors.add(app.router.add_route("POST", "/{session_name}/interrupt", interrupt))
cors.add(app.router.add_route("POST", "/{session_name}/complete", complete))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Replace sessions, kernels's status_history's type map with list

Revision ID: 8c8e90aebacd
Revises: dddf9be580f5
Create Date: 2024-01-26 11:19:23.075014

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "8c8e90aebacd"
down_revision = "dddf9be580f5"
branch_labels = None
depends_on = None


def upgrade():
op.execute(
"""
WITH data AS (
SELECT id,
(jsonb_each(status_history)).key AS status,
(jsonb_each(status_history)).value AS timestamp
FROM kernels
)
UPDATE kernels
SET status_history = (
SELECT jsonb_agg(
jsonb_build_object('status', status, 'timestamp', timestamp)
)
FROM data
WHERE data.id = kernels.id
);
"""
)

op.execute(
"""
WITH data AS (
SELECT id,
(jsonb_each(status_history)).key AS status,
(jsonb_each(status_history)).value AS timestamp
FROM sessions
)
UPDATE sessions
SET status_history = (
SELECT jsonb_agg(
jsonb_build_object('status', status, 'timestamp', timestamp)
)
FROM data
WHERE data.id = sessions.id
);
"""
)


def downgrade():
op.execute(
"""
WITH data AS (
SELECT id,
jsonb_object_agg(
elem->>'status', elem->>'timestamp'
) AS new_status_history
FROM kernels,
jsonb_array_elements(status_history) AS elem
GROUP BY id
)
UPDATE kernels
SET status_history = data.new_status_history
FROM data
WHERE data.id = kernels.id;
"""
)

op.execute(
"""
WITH data AS (
SELECT id,
jsonb_object_agg(
elem->>'status', elem->>'timestamp'
) AS new_status_history
FROM sessions,
jsonb_array_elements(status_history) AS elem
GROUP BY id
)
UPDATE sessions
SET status_history = data.new_status_history
FROM data
WHERE data.id = sessions.id;
"""
)
Loading
Loading