Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Implement ID based client workflow to ContainerRegistry graphql API #2615

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
0fec32a
feat: Add `project` column to the images table and refactoring `Image…
jopemachine Aug 13, 2024
a9fc9ef
chore: Add news fragment
jopemachine Aug 13, 2024
7a2ce72
fix: Infer `project` by querying ImageRow in `create_cluster`
jopemachine Aug 13, 2024
ea0a5d6
refactor: `ImageRef` to be simple type that only hold data
jopemachine Aug 14, 2024
0c50e2a
feat: Add migration script
jopemachine Aug 16, 2024
c706894
fix: Wrong rescan logic
jopemachine Aug 16, 2024
729c002
fix: Broke image str parsing logic in start_session
jopemachine Aug 16, 2024
6de8d89
fix: Circular import error when starts webserver
jopemachine Aug 16, 2024
e419469
fix: Change the variable name `image_ref` which is not actual `ImageR…
jopemachine Aug 16, 2024
a802f82
fix: Broekn test_docker.py
jopemachine Aug 16, 2024
9c0082f
chore: Rename variable name
jopemachine Aug 16, 2024
7f67a9a
refactor: for loop
jopemachine Aug 16, 2024
26f5299
refactor: Simplify `commit_rescan_result`
jopemachine Aug 17, 2024
acf39d2
fix: `start_session`'s broken code
jopemachine Aug 17, 2024
0a367f1
revert: ImageRow.from_alias changes
jopemachine Aug 17, 2024
c654ee7
refactor: Integrate `ImageRow.resolve` and `ImageRow.resolve_by_ident…
jopemachine Aug 17, 2024
fb3f593
fix: Broken container commit
jopemachine Aug 17, 2024
04469b0
fix: Add missing column name in convert_session_to_image
jopemachine Aug 19, 2024
8057179
chore: Change the incorrect names of some tests, and add some tests
jopemachine Aug 19, 2024
56d7d84
chore: Add comment
jopemachine Aug 19, 2024
0530365
fix: Handle StopIteration correctly
jopemachine Aug 19, 2024
aca5600
fix: Skip images with empty name
jopemachine Aug 19, 2024
e5b2ca0
fix: Disallow empty project in ImageRow
jopemachine Aug 19, 2024
47c2770
fix: Wrong image extraction of ImageRow.image_ref
jopemachine Aug 20, 2024
6529054
refactor: image string parsing logic
jopemachine Aug 21, 2024
69c645e
fix: Add missing continue statement
jopemachine Aug 21, 2024
355ca92
chore: fix wrong type declarations
jopemachine Aug 21, 2024
b3e502a
fix: Remove useless project from `ImageRegistry`
jopemachine Aug 21, 2024
152745b
chore: Write more detailed comment
jopemachine Aug 21, 2024
ba5279f
chore: edit some comments
jopemachine Aug 21, 2024
ddbb758
chore: Remove useless `KernelRow.get_image_ref()`
jopemachine Aug 21, 2024
1a4474d
fix: Remove useless `project` from KernelCreationConfig
jopemachine Aug 21, 2024
5c89dd7
feat: Add `test_msgpack_image_ref`
jopemachine Aug 22, 2024
5776d4a
fix: Merge with previous PR
jopemachine Aug 22, 2024
17b123d
fix: Broken `commit_rescan_result`
jopemachine Aug 22, 2024
d220941
fix: Reflect feedbacks
jopemachine Aug 23, 2024
69a5a62
fix: lint
jopemachine Sep 2, 2024
8218263
fix: lint
jopemachine Sep 2, 2024
eb89f3e
fix: Broken test
jopemachine Sep 2, 2024
420d940
fix: Wrong image name parsing
jopemachine Sep 3, 2024
e577e1a
fix: Wrong exception handling of `ImageRef.from_image_str`
jopemachine Sep 3, 2024
02129c7
fix: possible error of `image_name` split
jopemachine Sep 3, 2024
07c1310
fix: `ImageRef.__lt__`
jopemachine Sep 4, 2024
4f80889
fix: Improve exception handling
jopemachine Sep 4, 2024
b8d93e2
fix: Use log.warning instead of log.warn
jopemachine Sep 4, 2024
2c12b54
fix: Rename variables and refactor `parse_image_str()`
jopemachine Sep 4, 2024
436d91d
refactor: `ImageRef.parse_image_str`
jopemachine Sep 4, 2024
a51aaec
chore: Update skip_reason
jopemachine Sep 4, 2024
62f609b
feat: Add `ImageNode.project`, `Image.projecrt`
jopemachine Sep 4, 2024
f6177da
fix: Add missing `row_id` to `ImageNode.from_legacy_image`
jopemachine Sep 4, 2024
6928645
chore: update GraphQL schema dump
jopemachine Sep 4, 2024
008d97b
fix: CI
jopemachine Sep 5, 2024
fb3a034
fix: Add missing `ImageNode` to `__all__`
jopemachine Sep 5, 2024
32daff1
chore: Improve `parse_image_str()` comment
jopemachine Sep 5, 2024
bafc589
fix: Remove useless type
jopemachine Sep 18, 2024
6489db9
fix: Merge with main
jopemachine Sep 18, 2024
9117f9d
chore: Remove useless line
jopemachine Sep 18, 2024
0277d6a
fix: broken `start_session`
jopemachine Sep 19, 2024
b48fa28
chore: Merge with main
jopemachine Sep 30, 2024
b27f253
feat: Implement ID based client workflow to container registry API
jopemachine Aug 2, 2024
663a3b6
chore: Add frament
jopemachine Aug 2, 2024
60e12bf
chore: update GraphQL schema dump
jopemachine Aug 3, 2024
3fc6b3e
chore: update GraphQL schema dump
jopemachine Aug 4, 2024
a71fb23
fix: Broken test and Write test cases for new ContainerRegistry API
jopemachine Aug 6, 2024
5d374ee
fix: Broken legacy gql
jopemachine Sep 2, 2024
e02666f
chore: update GraphQL schema dump
jopemachine Sep 2, 2024
066acfc
fix: Broken CI
jopemachine Sep 2, 2024
3f749a8
chore: Remove useless print
jopemachine Sep 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/2615.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implement ID-based client workflow to ContainerRegistry API.
1 change: 1 addition & 0 deletions changes/2707.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `project` column to the images table and refactoring `ImageRef` logic.
10 changes: 8 additions & 2 deletions src/ai/backend/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ def __init__(
session_id: SessionId,
agent_id: AgentId,
event_producer: EventProducer,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
distro: str,
local_config: Mapping[str, Any],
Expand All @@ -229,7 +230,7 @@ def __init__(
self.agent_id = agent_id
self.event_producer = event_producer
self.kernel_config = kernel_config
self.image_ref = ImageRef.from_image_config(kernel_config["image"])
self.image_ref = kernel_image
self.distro = distro
self.internal_data = kernel_config["internal_data"] or {}
self.computers = computers
Expand Down Expand Up @@ -1699,6 +1700,7 @@ async def init_kernel_context(
self,
kernel_id: KernelId,
session_id: SessionId,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
*,
restarting: bool = False,
Expand Down Expand Up @@ -1790,6 +1792,7 @@ async def create_kernel(
self,
session_id: SessionId,
kernel_id: KernelId,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
cluster_info: ClusterInfo,
*,
Expand All @@ -1814,6 +1817,7 @@ async def create_kernel(
ctx = await self.init_kernel_context(
kernel_id,
session_id,
kernel_image,
kernel_config,
restarting=restarting,
cluster_ssh_port_mapping=cluster_info.get("cluster_ssh_port_mapping"),
Expand Down Expand Up @@ -2352,7 +2356,7 @@ def get_public_service_ports(self, service_ports: list[ServicePort]) -> list[Ser
return [port for port in service_ports if port["protocol"] != ServicePortProtocols.INTERNAL]

@abstractmethod
async def extract_image_command(self, image_ref: str) -> str | None:
async def extract_image_command(self, image: str) -> str | None:
raise NotImplementedError

@abstractmethod
Expand Down Expand Up @@ -2439,6 +2443,7 @@ async def restart_kernel(
self,
session_id: SessionId,
kernel_id: KernelId,
kernel_image: ImageRef,
updating_kernel_config: KernelCreationConfig,
):
tracker = self.restarting_kernels.get(kernel_id)
Expand Down Expand Up @@ -2486,6 +2491,7 @@ async def restart_kernel(
await self.create_kernel(
session_id,
kernel_id,
kernel_image,
kernel_config,
existing_cluster_info,
restarting=True,
Expand Down
19 changes: 12 additions & 7 deletions src/ai/backend/agent/docker/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
from ai.backend.common.cgroup import get_cgroup_mount_point
from ai.backend.common.docker import MAX_KERNELSPEC, MIN_KERNELSPEC, ImageRef
from ai.backend.common.events import EventProducer, KernelLifecycleEventReason
from ai.backend.common.exception import ImageNotAvailable
from ai.backend.common.exception import ImageNotAvailable, InvalidImageName, InvalidImageTag
from ai.backend.common.plugin.monitor import ErrorPluginContext, StatsPluginContext
from ai.backend.common.types import (
AgentId,
Expand Down Expand Up @@ -200,6 +200,7 @@ def __init__(
session_id: SessionId,
agent_id: AgentId,
event_producer: EventProducer,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
distro: str,
local_config: Mapping[str, Any],
Expand All @@ -216,6 +217,7 @@ def __init__(
session_id,
agent_id,
event_producer,
kernel_image,
kernel_config,
distro,
local_config,
Expand Down Expand Up @@ -1234,10 +1236,10 @@ async def scan_available_resources(self) -> Mapping[SlotName, Decimal]:
self.local_config, {name: cctx.instance for name, cctx in self.computers.items()}
)

async def extract_image_command(self, image_ref: str) -> str | None:
async def extract_image_command(self, image: str) -> str | None:
async with closing_async(Docker()) as docker:
image = await docker.images.get(image_ref)
return image["Config"].get("Cmd")
result = await docker.images.get(image)
return result["Config"].get("Cmd")

async def enumerate_containers(
self,
Expand Down Expand Up @@ -1347,11 +1349,12 @@ async def scan_images(self) -> Mapping[str, str]:
if repo_tag.endswith("<none>"):
continue
try:
ImageRef(repo_tag, ["*"])
except ValueError:
ImageRef.parse_image_str(repo_tag, "*")
except (InvalidImageName, InvalidImageTag) as e:
log.warning(
"Image name {} does not conform to Backend.AI's image naming rule. This image will be ignored.",
"Image name {} does not conform to Backend.AI's image naming rule. This image will be ignored. Details: {}",
repo_tag,
e,
)
continue

Expand Down Expand Up @@ -1523,6 +1526,7 @@ async def init_kernel_context(
self,
kernel_id: KernelId,
session_id: SessionId,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
*,
restarting: bool = False,
Expand All @@ -1534,6 +1538,7 @@ async def init_kernel_context(
session_id,
self.id,
self.event_producer,
kernel_image,
kernel_config,
distro,
self.local_config,
Expand Down
8 changes: 6 additions & 2 deletions src/ai/backend/agent/dummy/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(
session_id: SessionId,
agent_id: AgentId,
event_producer: EventProducer,
kenrel_image: ImageRef,
kernel_config: KernelCreationConfig,
distro: str,
local_config: Mapping[str, Any],
Expand All @@ -68,6 +69,7 @@ def __init__(
session_id,
agent_id,
event_producer,
kenrel_image,
kernel_config,
distro,
local_config,
Expand Down Expand Up @@ -248,7 +250,7 @@ def get_public_service_ports(self, service_ports: list[ServicePort]) -> list[Ser
async def sync_container_lifecycles(self, interval: float) -> None:
return

async def extract_command(self, image_ref: str) -> str | None:
async def extract_command(self, image: str) -> str | None:
return None

async def enumerate_containers(
Expand All @@ -268,7 +270,7 @@ async def scan_available_resources(self) -> Mapping[SlotName, Decimal]:
self.local_config, {name: cctx.instance for name, cctx in self.computers.items()}
)

async def extract_image_command(self, image_ref: str) -> str | None:
async def extract_image_command(self, image: str) -> str | None:
delay = self.dummy_agent_cfg["delay"]["scan-image"]
await asyncio.sleep(delay)
return "cr.backend.ai/stable/python:3.9-ubuntu20.04"
Expand Down Expand Up @@ -305,6 +307,7 @@ async def init_kernel_context(
self,
kernel_id: KernelId,
session_id: SessionId,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
*,
restarting: bool = False,
Expand All @@ -316,6 +319,7 @@ async def init_kernel_context(
session_id,
self.id,
self.event_producer,
kernel_image,
kernel_config,
distro,
self.local_config,
Expand Down
4 changes: 4 additions & 0 deletions src/ai/backend/agent/kubernetes/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ def __init__(
session_id: SessionId,
agent_id: AgentId,
event_producer: EventProducer,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
distro: str,
local_config: Mapping[str, Any],
Expand All @@ -119,6 +120,7 @@ def __init__(
session_id,
agent_id,
event_producer,
kernel_image,
kernel_config,
distro,
local_config,
Expand Down Expand Up @@ -1032,6 +1034,7 @@ async def init_kernel_context(
self,
kernel_id: KernelId,
session_id: SessionId,
kernel_image: ImageRef,
kernel_config: KernelCreationConfig,
*,
restarting: bool = False,
Expand All @@ -1043,6 +1046,7 @@ async def init_kernel_context(
session_id,
self.id,
self.event_producer,
kernel_image,
kernel_config,
distro,
self.local_config,
Expand Down
20 changes: 10 additions & 10 deletions src/ai/backend/agent/kubernetes/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ class PersistentServiceContainer:
def __init__(
self,
docker: Docker,
image_ref: str,
image: str,
container_config: Mapping[str, Any],
*,
name: Optional[str] = None,
) -> None:
self.docker = docker
self.image_ref = image_ref
default_container_name = image_ref.split(":")[0].rsplit("/", maxsplit=1)[-1]
self.image = image
default_container_name = image.split(":")[0].rsplit("/", maxsplit=1)[-1]
if name is None:
self.container_name = default_container_name
else:
Expand Down Expand Up @@ -69,7 +69,7 @@ async def get_container_version_and_status(self) -> Tuple[int, bool]:

async def get_image_version(self) -> int:
try:
img = await self.docker.images.inspect(self.image_ref)
img = await self.docker.images.inspect(self.image)
except DockerError as e:
if e.status == 404:
return 0
Expand All @@ -80,22 +80,22 @@ async def get_image_version(self) -> int:
async def ensure_running_latest(self) -> None:
image_version = await self.get_image_version()
if image_version == 0:
log.info("PersistentServiceContainer({}): installing...", self.image_ref)
log.info("PersistentServiceContainer({}): installing...", self.image)
await self.install_latest()
elif image_version < self.img_version:
log.info(
"PersistentServiceContainer({}): upgrading (v{} -> v{})",
self.image_ref,
self.image,
image_version,
self.img_version,
)
await self.install_latest()
container_version, is_running = await self.get_container_version_and_status()
if container_version == 0 or image_version != container_version or not is_running:
log.info("PersistentServiceContainer({}): recreating...", self.image_ref)
log.info("PersistentServiceContainer({}): recreating...", self.image)
await self.recreate()
if not is_running:
log.info("PersistentServiceContainer({}): starting...", self.image_ref)
log.info("PersistentServiceContainer({}): starting...", self.image)
await self.start()

async def install_latest(self) -> None:
Expand All @@ -112,7 +112,7 @@ async def install_latest(self) -> None:
stderr = await proc.stderr.read()
raise RuntimeError(
"loading the image has failed!",
self.image_ref,
self.image,
proc.returncode,
stderr,
)
Expand All @@ -130,7 +130,7 @@ async def recreate(self) -> None:
else:
raise
container_config: dict[str, Any] = {
"Image": self.image_ref,
"Image": self.image,
"Tty": True,
"Privileged": False,
"AttachStdin": False,
Expand Down
20 changes: 8 additions & 12 deletions src/ai/backend/agent/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ async def create_kernels(
raw_kernel_ids: Sequence[str],
raw_configs: Sequence[dict],
raw_cluster_info: dict,
kernel_image_refs: dict[KernelId, ImageRef],
):
cluster_info = cast(ClusterInfo, raw_cluster_info)
session_id = SessionId(UUID(raw_session_id))
Expand All @@ -503,6 +504,7 @@ async def create_kernels(
self.agent.create_kernel(
session_id,
kernel_id,
kernel_image_refs[kernel_id],
kernel_config,
cluster_info,
throttle_sema=throttle_sema,
Expand Down Expand Up @@ -583,12 +585,14 @@ async def restart_kernel(
self,
session_id: str,
kernel_id: str,
kernel_image: ImageRef,
updated_config: dict,
) -> dict[str, Any]:
log.info("rpc::restart_kernel(s:{0}, k:{1})", session_id, kernel_id)
return await self.agent.restart_kernel(
SessionId(UUID(session_id)),
KernelId(UUID(kernel_id)),
kernel_image,
cast(KernelCreationConfig, updated_config),
)

Expand Down Expand Up @@ -704,30 +708,22 @@ async def _commit(reporter: ProgressReporter) -> None:
@collect_error
async def push_image(
self,
canonical: str,
architecture: str,
image_ref: ImageRef,
registry_conf: ImageRegistry,
*,
is_local: bool = False,
) -> dict[str, Any]:
log.info("rpc::push_image(c:{})", canonical)
log.info("rpc::push_image(c:{})", image_ref.canonical)
bgtask_mgr = self.agent.background_task_manager

async def _push_image(reporter: ProgressReporter) -> None:
await self.agent.push_image(
ImageRef(
canonical,
known_registries=["*"],
is_local=is_local,
architecture=architecture,
),
image_ref,
registry_conf,
)

task_id = await bgtask_mgr.start(_push_image)
return {
"bgtask_id": str(task_id),
"canonical": canonical,
"canonical": image_ref.canonical,
}

@rpc_function
Expand Down
Loading
Loading