Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Replace sessions, kernels's status_history's type map with list #2113

Closed
wants to merge 45 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
e1c03aa
Change status_history from map to list
jopemachine Oct 27, 2023
59f4b60
Fix test
jopemachine Oct 29, 2023
f576ca5
Try to fix CI
jopemachine Oct 30, 2023
42d12ed
Fix test
jopemachine Oct 30, 2023
49338b6
Rename `sql_list_append` -> `sql_append_lists_to_list`
jopemachine Oct 30, 2023
19bb91d
Add fragment
jopemachine Oct 30, 2023
da24de4
Fix wrong comment position
jopemachine Oct 30, 2023
feec035
Rename get_first_status_history_record function
jopemachine Oct 30, 2023
380908a
Fix wrong implementation of session_history of vfolder
jopemachine Oct 30, 2023
1f521a4
Add default value to session status history
jopemachine Oct 30, 2023
2373005
Code organization
jopemachine Oct 30, 2023
8f7a6d6
Rename get_first_occurrence_time function
jopemachine Oct 30, 2023
9b0e4be
Add comments for status_history column
jopemachine Oct 30, 2023
2cf840e
Try to fix _fetch_hanging_sessions
jopemachine Oct 30, 2023
185164b
Fix broken _fetch_hanging_sessions
jopemachine Oct 30, 2023
f92a54a
Remove useless newline
jopemachine Oct 30, 2023
9a6e0ad
Add migration script
jopemachine Oct 30, 2023
f0bf3f5
Change status history format
jopemachine Nov 2, 2023
4cb68de
Add FieldSpec for status_history
jopemachine Nov 3, 2023
5e44e85
Fix status_history command
jopemachine Nov 3, 2023
1839ef3
Update obsoleted comments
jopemachine Nov 6, 2023
1c26e4b
Update migration script
jopemachine Nov 6, 2023
32dc1cc
Update _fetch_hanging_sessions SQL
jopemachine Nov 6, 2023
ddfce33
Allow to search stale sessions
jopemachine Nov 6, 2023
76cc98e
Resolve alembic conflict
jopemachine Jan 26, 2024
b1520c9
chore: Merge with main
jopemachine May 3, 2024
10f36b2
chore: update GraphQL schema dump
jopemachine May 3, 2024
4b0758c
fix: revert unrelated change
jopemachine May 3, 2024
240ee6c
docs: update migration script msg
jopemachine May 3, 2024
d0dc961
chore: Add description field
jopemachine May 3, 2024
855f91f
chore: update GraphQL schema dump
jopemachine May 3, 2024
2bb11f3
fix: alembic migration script down_revision
jopemachine May 3, 2024
4080505
fix: Wrong handling of `get_first_occurrence_time`
jopemachine May 3, 2024
cc7d896
chore: Rename `get_first_occurrence_time` -> `get_first_timestamp_for…
jopemachine May 3, 2024
cf5d7d5
docs: Update fragment
jopemachine May 6, 2024
f25bf61
docs: Add `deprecation_reason` to `status_history` in gql
jopemachine May 6, 2024
62237a3
chore: update GraphQL schema dump
jopemachine May 6, 2024
d89a64f
fix: Mishandling of `scheduled_at` in gql
jopemachine May 7, 2024
6d4bc09
fix: Change `get_first_timestamp_for_status` return type to datetime
jopemachine May 7, 2024
3af5ec9
fix: perform missing alter column in migration script
jopemachine May 9, 2024
895fca8
refactor: Move get_first_timestamp_for_status() from common.utils to …
achimnol Jul 15, 2024
bf0f5a4
fix: Reconcile the alembic migration history
achimnol Jul 15, 2024
4f63ea7
fix: Let migrations convert columns with the other type only
achimnol Jul 18, 2024
3bb3d31
fix: Change data types that were mistakenly converted to tuples
jopemachine Jul 18, 2024
5eeb502
fix: Update the codes added in #2275
achimnol Jul 22, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/1662.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Change the type of `status_history` from a mapping of status and timestamps to a list of log entries containing status and timestamps, to preserve timestamps when revisiting session/kernel statuses (e.g., after session restarts).
57 changes: 31 additions & 26 deletions src/ai/backend/client/cli/session/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import inquirer
import treelib
from async_timeout import timeout
from dateutil.parser import isoparse
from dateutil.tz import tzutc
from faker import Faker
from humanize import naturalsize
Expand All @@ -25,6 +24,8 @@
from ai.backend.cli.main import main
from ai.backend.cli.params import CommaSeparatedListType, OptionalType
from ai.backend.cli.types import ExitCode, Undefined, undefined
from ai.backend.client.cli.extensions import pass_ctx_obj
from ai.backend.client.cli.types import CLIContext
from ai.backend.common.arch import DEFAULT_IMAGE_ARCH
from ai.backend.common.types import ClusterMode

Expand All @@ -34,6 +35,7 @@
from ...output.fields import session_fields
from ...output.types import FieldSpec
from ...session import AsyncSession, Session
from ...utils import get_first_timestamp_for_status
from .. import events
from ..pretty import (
ProgressViewer,
Expand Down Expand Up @@ -778,8 +780,9 @@ def logs(session_id, kernel: str | None):


@session.command("status-history")
@pass_ctx_obj
@click.argument("session_id", metavar="SESSID")
def status_history(session_id):
def status_history(ctx: CLIContext, session_id):
"""
Shows the status transition history of the compute session.

Expand All @@ -791,31 +794,33 @@ def status_history(session_id):
kernel = session.ComputeSession(session_id)
try:
status_history = kernel.get_status_history().get("result")
print_info(f"status_history: {status_history}")
if (preparing := status_history.get("preparing")) is None:
result = {
"result": {
"seconds": 0,
"microseconds": 0,
},
}
elif (terminated := status_history.get("terminated")) is None:
alloc_time_until_now: timedelta = datetime.now(tzutc()) - isoparse(preparing)
result = {
"result": {
"seconds": alloc_time_until_now.seconds,
"microseconds": alloc_time_until_now.microseconds,
},
}

prev_time = None

for status_record in status_history:
timestamp = datetime.fromisoformat(status_record["timestamp"])

if prev_time:
time_diff = timestamp - prev_time
status_record["time_elapsed"] = str(time_diff)

prev_time = timestamp

ctx.output.print_list(
status_history,
[FieldSpec("status"), FieldSpec("timestamp"), FieldSpec("time_elapsed")],
)

if (preparing := get_first_timestamp_for_status(status_history, "PREPARING")) is None:
elapsed = timedelta()
elif (
terminated := get_first_timestamp_for_status(status_history, "TERMINATED")
) is None:
elapsed = datetime.now(tzutc()) - preparing
else:
alloc_time: timedelta = isoparse(terminated) - isoparse(preparing)
result = {
"result": {
"seconds": alloc_time.seconds,
"microseconds": alloc_time.microseconds,
},
}
print_done(f"Actual Resource Allocation Time: {result}")
elapsed = terminated - preparing

print_done(f"Actual Resource Allocation Time: {elapsed.total_seconds()}")
Comment on lines +814 to +823

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You've probably kept the existing behavior based on dict, but since you've changed the structure based on list, shouldn't you show the whole history change?

except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/client/output/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,8 @@
FieldSpec("created_user_id"),
FieldSpec("status"),
FieldSpec("status_info"),
FieldSpec("status_history"),
FieldSpec("status_history_log"),
FieldSpec("status_data", formatter=nested_dict_formatter),
FieldSpec("status_changed", "Last Updated"),
FieldSpec("created_at"),
Expand Down
14 changes: 14 additions & 0 deletions src/ai/backend/client/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from __future__ import annotations

import io
import os
from datetime import datetime

from dateutil.parser import parse as dtparse
from tqdm import tqdm


Expand Down Expand Up @@ -48,3 +52,13 @@ def readinto1(self, *args, **kwargs):
count = super().readinto1(*args, **kwargs)
self.tqdm.set_postfix(file=self._filename, refresh=False)
self.tqdm.update(count)


def get_first_timestamp_for_status(
status_history: list[dict[str, str]],
status: str,
) -> datetime | None:
for rec in status_history:
if rec["status"] == status:
return dtparse(rec["timestamp"])
return None
2 changes: 1 addition & 1 deletion src/ai/backend/manager/api/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ async def _pipe_builder(r: Redis) -> RedisPipeline:
"status": row["status"].name,
"status_info": row["status_info"],
"status_changed": str(row["status_changed"]),
"status_history": row["status_history"] or {},
"status_history": row["status_history"],
"cluster_mode": row["cluster_mode"],
}
if group_id not in objs_per_group:
Expand Down
5 changes: 4 additions & 1 deletion src/ai/backend/manager/api/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,10 @@ type ComputeSession implements Item {
status_changed: DateTime
status_info: String
status_data: JSONString
status_history: JSONString
status_history: JSONString @deprecated(reason: "Deprecated since 24.09.0; use `status_history_log`")

"""Added in 24.09.0"""
status_history_log: JSONString
jopemachine marked this conversation as resolved.
Show resolved Hide resolved
created_at: DateTime
terminated_at: DateTime
starts_at: DateTime
Expand Down
31 changes: 31 additions & 0 deletions src/ai/backend/manager/api/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2186,6 +2186,36 @@ async def get_container_logs(
return web.json_response(resp, status=200)


@server_status_required(READ_ALLOWED)
@auth_required
@check_api_params(
t.Dict({
tx.AliasedKey(["session_name", "sessionName", "task_id", "taskId"]) >> "kernel_id": tx.UUID,
t.Key("owner_access_key", default=None): t.Null | t.String,
})
)
async def get_status_history(request: web.Request, params: Any) -> web.Response:
root_ctx: RootContext = request.app["_root.context"]
session_name: str = request.match_info["session_name"]
requester_access_key, owner_access_key = await get_access_key_scopes(request, params)
log.info(
"GET_STATUS_HISTORY (ak:{}/{}, s:{})", requester_access_key, owner_access_key, session_name
)
resp: dict[str, Mapping] = {"result": {}}

async with root_ctx.db.begin_readonly_session() as db_sess:
compute_session = await SessionRow.get_session(
db_sess,
session_name,
owner_access_key,
allow_stale=True,
kernel_loading_strategy=KernelLoadingStrategy.MAIN_KERNEL_ONLY,
)
resp["result"] = compute_session.status_history

return web.json_response(resp, status=200)


@server_status_required(READ_ALLOWED)
@auth_required
@check_api_params(
Expand Down Expand Up @@ -2321,6 +2351,7 @@ def create_app(
app.router.add_route("GET", "/{session_name}/direct-access-info", get_direct_access_info)
)
cors.add(app.router.add_route("GET", "/{session_name}/logs", get_container_logs))
cors.add(app.router.add_route("GET", "/{session_name}/status-history", get_status_history))
cors.add(app.router.add_route("POST", "/{session_name}/rename", rename_session))
cors.add(app.router.add_route("POST", "/{session_name}/interrupt", interrupt))
cors.add(app.router.add_route("POST", "/{session_name}/complete", complete))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
"""Replace sessions, kernels's status_history's type map with list

Revision ID: 8c8e90aebacd
Revises: 59a622c31820
Create Date: 2024-01-26 11:19:23.075014

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "8c8e90aebacd"
down_revision = "59a622c31820"
branch_labels = None
depends_on = None


def upgrade():
op.execute(
"""
WITH data AS (
SELECT id,
(jsonb_each(status_history)).key AS status,
(jsonb_each(status_history)).value AS timestamp
FROM kernels
WHERE jsonb_typeof(status_history) = 'object'
)
UPDATE kernels
SET status_history = (
SELECT jsonb_agg(
jsonb_build_object('status', status, 'timestamp', timestamp)
)
FROM data
WHERE data.id = kernels.id
AND jsonb_typeof(kernels.status_history) = 'object'
);
"""
)
op.execute("UPDATE kernels SET status_history = '[]'::jsonb WHERE status_history IS NULL;")
op.alter_column("kernels", "status_history", nullable=False, default=[])

op.execute(
"""
WITH data AS (
SELECT id,
(jsonb_each(status_history)).key AS status,
(jsonb_each(status_history)).value AS timestamp
FROM sessions
WHERE jsonb_typeof(status_history) = 'object'
)
UPDATE sessions
SET status_history = (
SELECT jsonb_agg(
jsonb_build_object('status', status, 'timestamp', timestamp)
)
FROM data
WHERE data.id = sessions.id
AND jsonb_typeof(sessions.status_history) = 'object'
);
"""
)
op.execute("UPDATE sessions SET status_history = '[]'::jsonb WHERE status_history IS NULL;")
op.alter_column("sessions", "status_history", nullable=False, default=[])


def downgrade():
op.execute(
"""
WITH data AS (
SELECT id,
jsonb_object_agg(
elem->>'status', elem->>'timestamp'
) AS new_status_history
FROM kernels,
jsonb_array_elements(status_history) AS elem
WHERE jsonb_typeof(status_history) = 'array'
GROUP BY id
)
UPDATE kernels
SET status_history = data.new_status_history
FROM data
WHERE data.id = kernels.id
AND jsonb_typeof(kernels.status_history) = 'array';
"""
)
op.alter_column("kernels", "status_history", nullable=True, default=None)
op.execute("UPDATE kernels SET status_history = NULL WHERE status_history = '[]'::jsonb;")

op.execute(
"""
WITH data AS (
SELECT id,
jsonb_object_agg(
elem->>'status', elem->>'timestamp'
) AS new_status_history
FROM sessions,
jsonb_array_elements(status_history) AS elem
WHERE jsonb_typeof(status_history) = 'array'
GROUP BY id
)
UPDATE sessions
SET status_history = data.new_status_history
FROM data
WHERE data.id = sessions.id
AND jsonb_typeof(sessions.status_history) = 'array';
"""
)
op.alter_column("sessions", "status_history", nullable=True, default=None)
op.execute("UPDATE sessions SET status_history = NULL WHERE status_history = '[]'::jsonb;")
Loading
Loading