Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: Replace sessions, kernels's status_history's type map with list #1662

Closed
wants to merge 39 commits into from
Closed
Show file tree
Hide file tree
Changes from 31 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f174192
Change status_history from map to list
jopemachine Oct 27, 2023
d34516b
Fix test
jopemachine Oct 29, 2023
013d090
Try to fix CI
jopemachine Oct 30, 2023
776cfea
Fix test
jopemachine Oct 30, 2023
a7c7d8b
Rename `sql_list_append` -> `sql_append_lists_to_list`
jopemachine Oct 30, 2023
ff60d14
Add fragment
jopemachine Oct 30, 2023
466819d
Fix wrong comment position
jopemachine Oct 30, 2023
02e2509
Rename get_first_status_history_record function
jopemachine Oct 30, 2023
637b479
Fix wrong implementation of session_history of vfolder
jopemachine Oct 30, 2023
e005f7a
Add default value to session status history
jopemachine Oct 30, 2023
8f0126f
Code organization
jopemachine Oct 30, 2023
e7388f9
Rename get_first_occurrence_time function
jopemachine Oct 30, 2023
7645e9f
Add comments for status_history column
jopemachine Oct 30, 2023
9a9a536
Try to fix _fetch_hanging_sessions
jopemachine Oct 30, 2023
d622cd3
Fix broken _fetch_hanging_sessions
jopemachine Oct 30, 2023
0360541
Remove useless newline
jopemachine Oct 30, 2023
915f900
Add migration script
jopemachine Oct 30, 2023
b679db5
Change status history format
jopemachine Nov 2, 2023
6e457bb
Add FieldSpec for status_history
jopemachine Nov 3, 2023
78b6f1d
Fix status_history command
jopemachine Nov 3, 2023
daf43af
Update obsoleted comments
jopemachine Nov 6, 2023
e4f9dd3
Update migration script
jopemachine Nov 6, 2023
a12a076
Update _fetch_hanging_sessions SQL
jopemachine Nov 6, 2023
b952c41
Allow to search stale sessions
jopemachine Nov 6, 2023
36160bf
Resolve alembic conflict
jopemachine Jan 26, 2024
bdfd636
chore: Merge with main
jopemachine May 3, 2024
9b83042
chore: update GraphQL schema dump
jopemachine May 3, 2024
bd7b318
fix: revert unrelated change
jopemachine May 3, 2024
956660d
docs: update migration script msg
jopemachine May 3, 2024
24f425a
chore: Add description field
jopemachine May 3, 2024
13121a3
chore: update GraphQL schema dump
jopemachine May 3, 2024
88529a2
fix: alembic migration script down_revision
jopemachine May 3, 2024
7d1e429
fix: Wrong handling of `get_first_occurrence_time`
jopemachine May 3, 2024
340e59d
chore: Rename `get_first_occurrence_time` -> `get_first_timestamp_for…
jopemachine May 3, 2024
b85bbbb
docs: Update fragment
jopemachine May 6, 2024
34e82c7
docs: Add `deprecation_reason` to `status_history` in gql
jopemachine May 6, 2024
971a30e
chore: update GraphQL schema dump
jopemachine May 6, 2024
79ccd88
fix: Mishandling of `scheduled_at` in gql
jopemachine May 7, 2024
cf8a6d0
fix: Change `get_first_timestamp_for_status` return type to datetime
jopemachine May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/1662.fix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Replace `status_history`'s type `map` with `list`
jopemachine marked this conversation as resolved.
Show resolved Hide resolved
54 changes: 29 additions & 25 deletions src/ai/backend/client/cli/session/lifecycle.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
from ai.backend.cli.main import main
from ai.backend.cli.params import CommaSeparatedListType, OptionalType
from ai.backend.cli.types import ExitCode, Undefined, undefined
from ai.backend.client.cli.extensions import pass_ctx_obj
from ai.backend.client.cli.types import CLIContext
from ai.backend.common.arch import DEFAULT_IMAGE_ARCH
from ai.backend.common.types import ClusterMode
from ai.backend.common.utils import get_first_occurrence_time

from ...compat import asyncio_run
from ...exceptions import BackendAPIError
Expand Down Expand Up @@ -763,8 +766,9 @@ def logs(session_id):


@session.command("status-history")
@pass_ctx_obj
@click.argument("session_id", metavar="SESSID")
def status_history(session_id):
def status_history(ctx: CLIContext, session_id):
"""
Shows the status transition history of the compute session.

Expand All @@ -776,31 +780,31 @@ def status_history(session_id):
kernel = session.ComputeSession(session_id)
try:
status_history = kernel.get_status_history().get("result")
print_info(f"status_history: {status_history}")
if (preparing := status_history.get("preparing")) is None:
result = {
"result": {
"seconds": 0,
"microseconds": 0,
},
}
elif (terminated := status_history.get("terminated")) is None:
alloc_time_until_now: timedelta = datetime.now(tzutc()) - isoparse(preparing)
result = {
"result": {
"seconds": alloc_time_until_now.seconds,
"microseconds": alloc_time_until_now.microseconds,
},
}

prev_time = None

for status_record in status_history:
timestamp = datetime.fromisoformat(status_record["timestamp"])

if prev_time:
time_diff = timestamp - prev_time
status_record["time_elapsed"] = str(time_diff)

prev_time = timestamp

ctx.output.print_list(
status_history,
[FieldSpec("status"), FieldSpec("timestamp"), FieldSpec("time_elapsed")],
)

if (preparing := get_first_occurrence_time(status_history, "PREPARING")) is None:
elapsed = timedelta()
elif (terminated := get_first_occurrence_time(status_history, "TERMINATED")) is None:
elapsed = datetime.now(tzutc()) - isoparse(preparing)
else:
alloc_time: timedelta = isoparse(terminated) - isoparse(preparing)
result = {
"result": {
"seconds": alloc_time.seconds,
"microseconds": alloc_time.microseconds,
},
}
print_done(f"Actual Resource Allocation Time: {result}")
elapsed = isoparse(terminated) - isoparse(preparing)

print_done(f"Actual Resource Allocation Time: {elapsed.total_seconds()}")
except Exception as e:
print_error(e)
sys.exit(ExitCode.FAILURE)
Expand Down
2 changes: 2 additions & 0 deletions src/ai/backend/client/output/fields.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,8 @@
FieldSpec("created_user_id"),
FieldSpec("status"),
FieldSpec("status_info"),
FieldSpec("status_history"),
FieldSpec("status_history_log"),
FieldSpec("status_data", formatter=nested_dict_formatter),
FieldSpec("status_changed", "Last Updated"),
FieldSpec("created_at"),
Expand Down
13 changes: 13 additions & 0 deletions src/ai/backend/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,3 +404,16 @@ async def umount(
fstab = Fstab(fp)
await fstab.remove_by_mountpoint(str(mountpoint))
return True


def get_first_occurrence_time(
status_history_records: list[dict[str, str]], status: str
) -> str | None:
"""
Get the first occurrence time of the given status from the status history records.
"""

for status_history in status_history_records:
if status_history["status"] == status:
return status_history["timestamp"]
return None
2 changes: 1 addition & 1 deletion src/ai/backend/manager/api/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,7 @@ async def _pipe_builder(r: Redis) -> RedisPipeline:
"status": row["status"].name,
"status_info": row["status_info"],
"status_changed": str(row["status_changed"]),
"status_history": row["status_history"] or {},
"status_history": row["status_history"],
"cluster_mode": row["cluster_mode"],
}
if group_id not in objs_per_group:
Expand Down
3 changes: 3 additions & 0 deletions src/ai/backend/manager/api/schema.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,9 @@ type ComputeSession implements Item {
status_info: String
status_data: JSONString
status_history: JSONString

"""Added in 24.09.0"""
status_history_log: JSONString
created_at: DateTime
terminated_at: DateTime
starts_at: DateTime
Expand Down
31 changes: 31 additions & 0 deletions src/ai/backend/manager/api/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -2151,6 +2151,36 @@ async def get_container_logs(request: web.Request, params: Any) -> web.Response:
return web.json_response(resp, status=200)


@server_status_required(READ_ALLOWED)
@auth_required
@check_api_params(
t.Dict({
tx.AliasedKey(["session_name", "sessionName", "task_id", "taskId"]) >> "kernel_id": tx.UUID,
t.Key("owner_access_key", default=None): t.Null | t.String,
})
)
async def get_status_history(request: web.Request, params: Any) -> web.Response:
root_ctx: RootContext = request.app["_root.context"]
session_name: str = request.match_info["session_name"]
requester_access_key, owner_access_key = await get_access_key_scopes(request, params)
log.info(
"GET_STATUS_HISTORY (ak:{}/{}, s:{})", requester_access_key, owner_access_key, session_name
)
resp: dict[str, Mapping] = {"result": {}}

async with root_ctx.db.begin_readonly_session() as db_sess:
compute_session = await SessionRow.get_session(
db_sess,
session_name,
owner_access_key,
allow_stale=True,
kernel_loading_strategy=KernelLoadingStrategy.MAIN_KERNEL_ONLY,
)
resp["result"] = compute_session.status_history

return web.json_response(resp, status=200)


@server_status_required(READ_ALLOWED)
@auth_required
@check_api_params(
Expand Down Expand Up @@ -2286,6 +2316,7 @@ def create_app(
app.router.add_route("GET", "/{session_name}/direct-access-info", get_direct_access_info)
)
cors.add(app.router.add_route("GET", "/{session_name}/logs", get_container_logs))
cors.add(app.router.add_route("GET", "/{session_name}/status-history", get_status_history))
cors.add(app.router.add_route("POST", "/{session_name}/rename", rename_session))
cors.add(app.router.add_route("POST", "/{session_name}/interrupt", interrupt))
cors.add(app.router.add_route("POST", "/{session_name}/complete", complete))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Replace sessions, kernels's status_history's type map with list

Revision ID: 8c8e90aebacd
Revises: 8b2ec7e3d22a
Create Date: 2024-01-26 11:19:23.075014

"""

from alembic import op

# revision identifiers, used by Alembic.
revision = "8c8e90aebacd"
down_revision = "8b2ec7e3d22a"
branch_labels = None
depends_on = None


def upgrade():
op.execute(
"""
WITH data AS (
SELECT id,
(jsonb_each(status_history)).key AS status,
(jsonb_each(status_history)).value AS timestamp
FROM kernels
)
UPDATE kernels
SET status_history = (
SELECT jsonb_agg(
jsonb_build_object('status', status, 'timestamp', timestamp)
)
FROM data
WHERE data.id = kernels.id
);
"""
)

op.execute(
"""
WITH data AS (
SELECT id,
(jsonb_each(status_history)).key AS status,
(jsonb_each(status_history)).value AS timestamp
FROM sessions
)
UPDATE sessions
SET status_history = (
SELECT jsonb_agg(
jsonb_build_object('status', status, 'timestamp', timestamp)
)
FROM data
WHERE data.id = sessions.id
);
"""
)


def downgrade():
op.execute(
"""
WITH data AS (
SELECT id,
jsonb_object_agg(
elem->>'status', elem->>'timestamp'
) AS new_status_history
FROM kernels,
jsonb_array_elements(status_history) AS elem
GROUP BY id
)
UPDATE kernels
SET status_history = data.new_status_history
FROM data
WHERE data.id = kernels.id;
"""
)

op.execute(
"""
WITH data AS (
SELECT id,
jsonb_object_agg(
elem->>'status', elem->>'timestamp'
) AS new_status_history
FROM sessions,
jsonb_array_elements(status_history) AS elem
GROUP BY id
)
UPDATE sessions
SET status_history = data.new_status_history
FROM data
WHERE data.id = sessions.id;
"""
)
38 changes: 23 additions & 15 deletions src/ai/backend/manager/models/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
SessionTypes,
VFolderMount,
)
from ai.backend.common.utils import get_first_occurrence_time

from ..api.exceptions import (
BackendError,
Expand Down Expand Up @@ -78,7 +79,11 @@
from .minilang.ordering import ColumnMapType, QueryOrderParser
from .minilang.queryfilter import FieldSpecType, QueryFilterParser, enum_field_getter
from .user import users
from .utils import ExtendedAsyncSAEngine, execute_with_retry, sql_json_merge
from .utils import (
ExtendedAsyncSAEngine,
execute_with_retry,
sql_append_dict_to_list,
)

if TYPE_CHECKING:
from .gql import GraphQueryContext
Expand Down Expand Up @@ -512,7 +517,14 @@ async def handle_kernel_exception(
# // used to prevent duplication of SessionTerminatedEvent
# }
# }
sa.Column("status_history", pgsql.JSONB(), nullable=True, default=sa.null()),
sa.Column("status_history", pgsql.JSONB(), nullable=False, default=[]),
# status_history records all status changes
# e.g)
# [
# {"status: "PENDING", "timestamp": "2022-10-22T10:22:30"},
# {"status: "SCHEDULED", "timestamp": "2022-10-22T11:40:30"},
# {"status: "PREPARING", "timestamp": "2022-10-25T10:22:30"}
# ]
sa.Column("callback_url", URLColumn, nullable=True, default=sa.null()),
sa.Column("startup_command", sa.Text, nullable=True),
sa.Column(
Expand Down Expand Up @@ -641,12 +653,9 @@ async def set_kernel_status(
data = {
"status": status,
"status_changed": now,
"status_history": sql_json_merge(
kernels.c.status_history,
(),
{
status.name: now.isoformat(), # ["PULLING", "PREPARING"]
},
"status_history": sql_append_dict_to_list(
KernelRow.status_history,
{"status": status.name, "timestamp": now.isoformat()},
),
}
if status_data is not None:
Expand Down Expand Up @@ -692,12 +701,9 @@ async def _update() -> bool:
if update_data is None:
update_values = {
"status": new_status,
"status_history": sql_json_merge(
"status_history": sql_append_dict_to_list(
KernelRow.status_history,
(),
{
new_status.name: now.isoformat(),
},
{"status": new_status.name, "timestamp": now.isoformat()},
),
}
else:
Expand Down Expand Up @@ -836,7 +842,9 @@ def parse_row(cls, ctx: GraphQueryContext, row: KernelRow) -> Mapping[str, Any]:
hide_agents = False
else:
hide_agents = ctx.local_config["manager"]["hide-agents"]
status_history = row.status_history or {}
status_history = row.status_history
scheduled_at = get_first_occurrence_time(status_history, KernelStatus.SCHEDULED.name)

return {
# identity
"id": row.id,
Expand All @@ -862,7 +870,7 @@ def parse_row(cls, ctx: GraphQueryContext, row: KernelRow) -> Mapping[str, Any]:
"created_at": row.created_at,
"terminated_at": row.terminated_at,
"starts_at": row.starts_at,
"scheduled_at": status_history.get(KernelStatus.SCHEDULED.name),
"scheduled_at": scheduled_at[1] if scheduled_at else None,
"occupied_slots": row.occupied_slots.to_json(),
# resources
"agent": row.agent if not hide_agents else None,
Expand Down
Loading