diff --git a/src/ai/backend/agent/agent.py b/src/ai/backend/agent/agent.py index 342edefff5..9e30adc217 100644 --- a/src/ai/backend/agent/agent.py +++ b/src/ai/backend/agent/agent.py @@ -154,6 +154,7 @@ if TYPE_CHECKING: from ai.backend.common.auth import PublicKey + from ai.backend.common.bgtask import ProgressReporter from ai.backend.common.etcd import AsyncEtcd log = BraceStyleAdapter(logging.getLogger(__spec__.name)) # type: ignore[name-defined] @@ -1719,6 +1720,12 @@ async def create_kernel( AutoPullBehavior(kernel_config.get("auto_pull", "digest")), ) if do_pull: + async def _task(reporter: ProgressReporter) -> None: + pass + + task_id: UUID = await self.local_config["background_task_manager"].start(_task) + log.warning("CREATE_KERNEL background_task_manager.start(t:{})", task_id) + await self.produce_event( KernelPullingEvent(kernel_id, session_id, ctx.image_ref.canonical), ) diff --git a/src/ai/backend/agent/docker/agent.py b/src/ai/backend/agent/docker/agent.py index e7c6216225..abb8b63d12 100644 --- a/src/ai/backend/agent/docker/agent.py +++ b/src/ai/backend/agent/docker/agent.py @@ -1270,7 +1270,54 @@ async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> } log.info("pulling image {} from registry", image_ref.canonical) async with closing_async(Docker()) as docker: - await docker.images.pull(image_ref.canonical, auth=auth_config) + # await docker.images.pull(image_ref.canonical, auth=auth_config) + async for event in docker.images.pull( + image_ref.canonical, auth=auth_config, stream=True + ): + event_id = event.get("id", None) + event_status = event.get("status", None) + match event_status: + case "Pulling fs layer": + pass + case "Waiting": + pass + case "Downloading": + current_progress = event.get("progressDetail", {}).get("current", 0) + total_progress = event.get("progressDetail", {}).get("total", 0) + case "Verifying Checksum": + pass + case "Download complete": + pass + case "Extracting": + current_progress = event.get("progressDetail", {}).get("current", 0) + total_progress = event.get("progressDetail", {}).get("total", 0) + case "Pull complete": + pass + case x if x.startswith("Digest: "): + # Digest: sha256:26d4127a64d6afcff8c1682f351339862318fe219ea8f4cee8cb572584e8aec1 + pass + case x if x.startswith("Status: "): + # Status: Downloaded newer image for cr.backend.ai/multiarch/python:3.9-ubuntu20.04 + pass + case _: + pass + current_progress = event.get("progressDetail", {}).get("current", 0) + total_progress = event.get("progressDetail", {}).get("total", 0) + # log.info( + # "IMAGE.PULL (image_ref:{}, registry_conf:{}) event:{}", + # image_ref, + # registry_conf, + # event, + # ) + log.info( + "IMAGE.PULL (image_ref:{}) (event:{}, status:{}, progress:{}%({}/{}))", + image_ref, + event_id, + event_status, + round(current_progress / (total_progress + 1) * 100, ndigits=2), + current_progress, + total_progress, + ) async def check_image( self, image_ref: ImageRef, image_id: str, auto_pull: AutoPullBehavior @@ -1401,6 +1448,9 @@ async def log_iter(): await self.collect_logs(kernel_id, container_id, log_iter()) except DockerError as e: if e.status == 404: + # /Users/rapsealk/Desktop/git/backend.ai-dev2/src/ai/backend/agent/docker/agent.py:1451: RuntimeWarning: coroutine 'Docker._do_query' was never awaited + # log.warning( + # RuntimeWarning: Enable tracemalloc to get the object allocation traceback log.warning( "container is already cleaned or missing (k:{}, cid:{})", kernel_id,