Skip to content

Commit

Permalink
Merge branch '24.03' into backport/2780-to-24.03
Browse files Browse the repository at this point in the history
  • Loading branch information
kyujin-cho authored Sep 25, 2024
2 parents 80a351f + 46be28f commit b008a85
Show file tree
Hide file tree
Showing 9 changed files with 195 additions and 131 deletions.
1 change: 1 addition & 0 deletions changes/2852.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for setting a timeout when pulling Docker images and upgrade aiodocker to version 0.23.0.
245 changes: 123 additions & 122 deletions python.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
aiodataloader-ng~=0.2.1
aiodocker==0.22.1
aiodocker==0.23.0
aiofiles~=24.1.0
aiohttp~=3.10.5
aiohttp~=3.10.6
aiohttp_cors~=0.7
aiohttp_jinja2~=1.6
aiohttp_sse>=2.2
Expand Down Expand Up @@ -76,7 +76,7 @@ typeguard~=4.3
typing_extensions~=4.11
textual~=0.79.1
uvloop~=0.20.0; sys_platform != "Windows" # 0.18 breaks the API and adds Python 3.12 support
yarl~=1.11.1
yarl~=1.12.1
zipstream-new~=1.1.8

# required by ai.backend.test (integration test suite)
Expand Down
25 changes: 23 additions & 2 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,13 @@ async def push_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) ->
"""

@abstractmethod
async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
async def pull_image(
self,
image_ref: ImageRef,
registry_conf: ImageRegistry,
*,
timeout: float | None,
) -> None:
"""
Pull the given image from the given registry.
"""
Expand Down Expand Up @@ -1835,11 +1841,26 @@ async def create_kernel(
kernel_config["image"]["digest"],
AutoPullBehavior(kernel_config.get("auto_pull", "digest")),
)
image_pull_timeout = cast(
float | None, self.local_config["agent"]["api"]["pull-timeout"]
)
if do_pull:
await self.produce_event(
KernelPullingEvent(kernel_id, session_id, ctx.image_ref.canonical),
)
await self.pull_image(ctx.image_ref, kernel_config["image"]["registry"])
try:
await self.pull_image(
ctx.image_ref,
kernel_config["image"]["registry"],
timeout=image_pull_timeout,
)
except asyncio.TimeoutError:
log.exception(
f"Image pull timeout after {image_pull_timeout} seconds. Destroying kernel (k:{kernel_id}, img:{ctx.image_ref.canonical})"
)
raise AgentError(
f"Image pull timeout after {image_pull_timeout} seconds. (img:{ctx.image_ref.canonical})"
)

if not restarting:
await self.produce_event(
Expand Down
10 changes: 10 additions & 0 deletions src/ai/backend/agent/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,11 +147,21 @@
"chunk-size": "64K", # used when storing logs to Redis as a side-channel to the event bus
}

DEFAULT_PULL_TIMEOUT = 2 * 60 * 60 # 2 hours

default_api_config = {
"pull-timeout": DEFAULT_PULL_TIMEOUT,
}

agent_etcd_config_iv = t.Dict({
t.Key("container-logs", default=default_container_logs_config): t.Dict({
t.Key("max-length", default=default_container_logs_config["max-length"]): tx.BinarySize(),
t.Key("chunk-size", default=default_container_logs_config["chunk-size"]): tx.BinarySize(),
}).allow_extra("*"),
t.Key("api", default=default_api_config): t.Dict({
t.Key("pull-timeout", default=default_api_config["pull-timeout"]): tx.ToNone
| t.ToFloat[0:], # Set the image pull timeout in seconds
}).allow_extra("*"),
}).allow_extra("*")

container_etcd_config_iv = t.Dict({
Expand Down
10 changes: 8 additions & 2 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,13 @@ async def push_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) ->
async with closing_async(Docker()) as docker:
await docker.images.push(image_ref.canonical, auth=auth_config)

async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
async def pull_image(
self,
image_ref: ImageRef,
registry_conf: ImageRegistry,
*,
timeout: float | None,
) -> None:
auth_config = None
reg_user = registry_conf.get("username")
reg_passwd = registry_conf.get("password")
Expand All @@ -1396,7 +1402,7 @@ async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) ->
}
log.info("pulling image {} from registry", image_ref.canonical)
async with closing_async(Docker()) as docker:
await docker.images.pull(image_ref.canonical, auth=auth_config)
await docker.images.pull(image_ref.canonical, auth=auth_config, timeout=timeout)

async def check_image(
self, image_ref: ImageRef, image_id: str, auto_pull: AutoPullBehavior
Expand Down
8 changes: 7 additions & 1 deletion src/ai/backend/agent/dummy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,13 @@ async def scan_images(self) -> Mapping[str, str]:
await asyncio.sleep(delay)
return {}

async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
async def pull_image(
self,
image_ref: ImageRef,
registry_conf: ImageRegistry,
*,
timeout: float | None,
) -> None:
delay = self.dummy_agent_cfg["delay"]["pull-image"]
await asyncio.sleep(delay)

Expand Down
8 changes: 7 additions & 1 deletion src/ai/backend/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -1001,7 +1001,13 @@ async def handle_agent_socket(self):
# TODO: Add support for remote agent socket mechanism
pass

async def pull_image(self, image_ref: ImageRef, registry_conf: ImageRegistry) -> None:
async def pull_image(
self,
image_ref: ImageRef,
registry_conf: ImageRegistry,
*,
timeout: float | None,
) -> None:
# TODO: Add support for appropriate image pulling mechanism on K8s
pass

Expand Down
13 changes: 13 additions & 0 deletions src/ai/backend/common/validators.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,19 @@ def check_and_return(self, value: Any) -> set:
self._failure("value must be Iterable")


class ToNone(t.Trafaret):
allowed_values = ("none", "null", "nil")

def check_and_return(self, value: Any) -> None:
if value is None:
return None
_value = str(value).strip().lower()
if _value in self.allowed_values:
return None
else:
self._failure(f"value must one of {self.allowed_values}")


class Delay(t.Trafaret):
"""
Convert a float or a tuple of 2 floats into a random generated float value
Expand Down

0 comments on commit b008a85

Please sign in to comment.