From 1151274f15b617a66b318f341e842087ef4f6613 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 21 Jun 2024 21:51:14 +0900 Subject: [PATCH 1/7] fix: Run batch execution after session starts --- src/ai/backend/agent/agent.py | 24 ++++++++++++------------ src/ai/backend/agent/server.py | 15 +++++++++++++++ src/ai/backend/manager/registry.py | 27 ++++++++++++++++++++++++++- 3 files changed, 53 insertions(+), 13 deletions(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index 73c5440a5f..3d78f6eb44 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -1679,6 +1679,18 @@ async def execute_batch( SessionFailureEvent(session_id, KernelLifecycleEventReason.TASK_CANCELLED, -2), ) + async def trigger_batch_execution( + self, + session_id: SessionId, + kernel_id: KernelId, + startup_command: str, + ) -> None: + self._ongoing_exec_batch_tasks.add( + asyncio.create_task( + self.execute_batch(session_id, kernel_id, startup_command), + ), + ) + async def create_kernel( self, session_id: SessionId, @@ -2064,18 +2076,6 @@ async def create_kernel( ), ) - if ( - kernel_config["session_type"] == "batch" - and kernel_config["cluster_role"] == "main" - ): - self._ongoing_exec_batch_tasks.add( - asyncio.create_task( - self.execute_batch( - session_id, kernel_id, kernel_config["startup_command"] or "" - ), - ), - ) - # The startup command for the batch-type sessions will be executed by the manager # upon firing of the "session_started" event. return kernel_creation_info diff --git a/src/ai/backend/agent/server.py b/src/ai/backend/agent/server.py index 74f1a81037..af18296d83 100644 --- a/src/ai/backend/agent/server.py +++ b/src/ai/backend/agent/server.py @@ -624,6 +624,21 @@ async def execute( ) return result + @rpc_function + @collect_error + async def trigger_batch_execution( + self, + session_id: str, + kernel_id: str, + code: str, + ) -> None: + log.info( + "rpc::trigger_batch_execution(k:{0}, s:{1}, code:{2})", kernel_id, session_id, code + ) + await self.agent.trigger_batch_execution( + SessionId(UUID(session_id)), KernelId(UUID(kernel_id)), code + ) + @rpc_function @collect_error async def start_service( diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index 456959a5dc..e16c447862 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -1618,11 +1618,17 @@ async def _update_session_occupying_slots(db_session: AsyncSession) -> None: SessionRow.name, SessionRow.creation_id, SessionRow.access_key, + SessionRow.session_type, + ), + selectinload( + SessionRow.kernels.and_(KernelRow.cluster_role == DEFAULT_ROLE).options( + load_only(KernelRow.id, KernelRow.agent, KernelRow.startup_command) + ) ), ) ) async with self.db.begin_readonly_session() as db_session: - updated_session = (await db_session.scalars(query)).first() + updated_session = cast(SessionRow, await db_session.scalar(query)) log.debug( "Producing SessionStartedEvent({}, {})", @@ -1640,6 +1646,9 @@ async def _update_session_occupying_slots(db_session: AsyncSession) -> None: updated_session.access_key, ), ) + + if updated_session.session_type == SessionTypes.BATCH: + await self.trigger_batch_execution(updated_session) except Exception: log.exception("error while executing _finalize_running") raise @@ -2638,6 +2647,22 @@ async def execute( flush_timeout, ) + async def trigger_batch_execution( + self, + session: SessionRow, + ) -> None: + async with handle_session_exception(self.db, "trigger_batch_execution", session.id): + async with self.agent_cache.rpc_context( + session.main_kernel.agent, + invoke_timeout=30, + order_key=session.main_kernel.id, + ) as rpc: + return await rpc.call.trigger_batch_execution( + str(session.id), + str(session.main_kernel.id), + session.main_kernel.startup_command or "", + ) + async def interrupt_session( self, session: SessionRow, From 3c0ab97fa47a28375be43ce5f14d1de621f681cf Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 21 Jun 2024 22:04:00 +0900 Subject: [PATCH 2/7] rename function --- src/ai/backend/agent/agent.py | 2 +- src/ai/backend/agent/server.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index 3d78f6eb44..ded3a73e5a 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -1679,7 +1679,7 @@ async def execute_batch( SessionFailureEvent(session_id, KernelLifecycleEventReason.TASK_CANCELLED, -2), ) - async def trigger_batch_execution( + async def create_batch_execution_task( self, session_id: SessionId, kernel_id: KernelId, diff --git a/src/ai/backend/agent/server.py b/src/ai/backend/agent/server.py index af18296d83..db4f709a56 100644 --- a/src/ai/backend/agent/server.py +++ b/src/ai/backend/agent/server.py @@ -635,7 +635,7 @@ async def trigger_batch_execution( log.info( "rpc::trigger_batch_execution(k:{0}, s:{1}, code:{2})", kernel_id, session_id, code ) - await self.agent.trigger_batch_execution( + await self.agent.create_batch_execution_task( SessionId(UUID(session_id)), KernelId(UUID(kernel_id)), code ) From 75e93a69dbd2ff1a3b1162ac5ded1b11e6f5f710 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Fri, 21 Jun 2024 22:05:01 +0900 Subject: [PATCH 3/7] add news fragment --- changes/2327.fix.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/2327.fix.md diff --git a/changes/2327.fix.md b/changes/2327.fix.md new file mode 100644 index 0000000000..8f7ecba3d2 --- /dev/null +++ b/changes/2327.fix.md @@ -0,0 +1 @@ +Run batch execution after a session starts. From 989fd448f3d0cc971fe73c85103917a0d1d7134b Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 24 Jun 2024 12:30:04 +0900 Subject: [PATCH 4/7] fix minor bugs --- src/ai/backend/manager/models/session.py | 1 + src/ai/backend/manager/registry.py | 12 +++++++++--- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/src/ai/backend/manager/models/session.py b/src/ai/backend/manager/models/session.py index fc7f7a14ed..802139da77 100644 --- a/src/ai/backend/manager/models/session.py +++ b/src/ai/backend/manager/models/session.py @@ -176,6 +176,7 @@ class SessionStatus(enum.Enum): "get_logs_from_agent": KernelExecutionFailed, "refresh_session": KernelExecutionFailed, "commit_session": KernelExecutionFailed, + "trigger_batch_execution": KernelExecutionFailed, } diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index e16c447862..3d408babec 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -45,7 +45,7 @@ from redis.asyncio import Redis from sqlalchemy.exc import DBAPIError from sqlalchemy.ext.asyncio import AsyncSession -from sqlalchemy.orm import load_only, noload, selectinload +from sqlalchemy.orm import load_only, noload, selectinload, with_loader_criteria from sqlalchemy.orm.exc import NoResultFound from yarl import URL @@ -1621,10 +1621,16 @@ async def _update_session_occupying_slots(db_session: AsyncSession) -> None: SessionRow.session_type, ), selectinload( - SessionRow.kernels.and_(KernelRow.cluster_role == DEFAULT_ROLE).options( - load_only(KernelRow.id, KernelRow.agent, KernelRow.startup_command) + SessionRow.kernels, + ).options( + load_only( + KernelRow.id, + KernelRow.agent, + KernelRow.cluster_role, + KernelRow.startup_command, ) ), + with_loader_criteria(KernelRow, KernelRow.cluster_role == DEFAULT_ROLE), ) ) async with self.db.begin_readonly_session() as db_session: From 6e2b01a78a7537b300c420fbbe790ebbdea1a0a9 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 1 Jul 2024 13:47:19 +0900 Subject: [PATCH 5/7] update news fragment --- changes/2327.fix.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/2327.fix.md b/changes/2327.fix.md index 8f7ecba3d2..84b0e426d9 100644 --- a/changes/2327.fix.md +++ b/changes/2327.fix.md @@ -1 +1 @@ -Run batch execution after a session starts. +Run batch execution after the batch session starts. From 6370e19cf3b5f80bf307bf95019cb29b5b3ceed8 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Mon, 1 Jul 2024 13:49:18 +0900 Subject: [PATCH 6/7] little naming change --- src/ai/backend/agent/agent.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index ded3a73e5a..5c2c97aed0 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -1683,11 +1683,11 @@ async def create_batch_execution_task( self, session_id: SessionId, kernel_id: KernelId, - startup_command: str, + code_to_execute: str, ) -> None: self._ongoing_exec_batch_tasks.add( asyncio.create_task( - self.execute_batch(session_id, kernel_id, startup_command), + self.execute_batch(session_id, kernel_id, code_to_execute), ), ) From 0d4fade817ad6579ea76cc7472016ba5a9175653 Mon Sep 17 00:00:00 2001 From: Sanghun Lee Date: Wed, 10 Jul 2024 22:25:14 +0900 Subject: [PATCH 7/7] execute batch when restart session --- src/ai/backend/manager/registry.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/ai/backend/manager/registry.py b/src/ai/backend/manager/registry.py index 6605f407bd..f01dfdad42 100644 --- a/src/ai/backend/manager/registry.py +++ b/src/ai/backend/manager/registry.py @@ -2691,6 +2691,9 @@ async def _restart_kernel(kernel: KernelRow) -> None: SessionStartedEvent(session.id, session.creation_id), ) + if session.session_type == SessionTypes.BATCH: + await self.trigger_batch_execution(session) + async def execute( self, session: SessionRow,