Skip to content

Commit

Permalink
refactor: Base ContainerRegistry's scan_tag and implement `MEDIA_TY…
Browse files Browse the repository at this point in the history
…PE_DOCKER_MANIFEST` type handling (#2620)
  • Loading branch information
jopemachine authored Sep 30, 2024
1 parent 7f968e7 commit a4b9b3c
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 69 deletions.
1 change: 1 addition & 0 deletions changes/2620.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Rafactor Base ContainerRegistry's `scan_tag` and implement `MEDIA_TYPE_DOCKER_MANIFEST` type handling.
217 changes: 160 additions & 57 deletions src/ai/backend/manager/container_registry/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from abc import ABCMeta, abstractmethod
from contextlib import asynccontextmanager as actxmgr
from contextvars import ContextVar
from typing import Any, AsyncIterator, Dict, Final, Optional, cast
from typing import Any, AsyncIterator, Dict, Final, Mapping, Optional, Sequence, cast

import aiohttp
import aiotools
Expand Down Expand Up @@ -268,7 +268,6 @@ async def _scan_tag(
image: str,
tag: str,
) -> None:
manifests = {}
async with concurrency_sema.get():
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST_LIST
async with sess.get(
Expand All @@ -281,62 +280,163 @@ async def _scan_tag(
content_type = resp.headers["Content-Type"]
resp.raise_for_status()
resp_json = await resp.json()
match content_type:
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
manifest_list = resp_json["manifests"]
request_type = self.MEDIA_TYPE_DOCKER_MANIFEST
case self.MEDIA_TYPE_OCI_INDEX:
manifest_list = [
item
for item in resp_json["manifests"]
if "annotations" not in item # skip attestation manifests
]
request_type = self.MEDIA_TYPE_OCI_MANIFEST
case _:
log.warning("Unknown content type: {}", content_type)
raise RuntimeError(
"The registry does not support the standard way of "
"listing multiarch images."
)
rqst_args["headers"]["Accept"] = request_type
for manifest in manifest_list:
platform_arg = (
f"{manifest["platform"]["os"]}/{manifest["platform"]["architecture"]}"
)
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)
async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest["digest"]}", **rqst_args
) as resp:
data = await resp.json()
config_digest = data["config"]["digest"]
size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"]
async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}
# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

if not labels:
log.warning(
"Labels section not found on image {}:{}/{}", image, tag, architecture
)

manifests[architecture] = {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}
await self._read_manifest(image, tag, manifests)
async with aiotools.TaskGroup() as tg:
match content_type:
case self.MEDIA_TYPE_DOCKER_MANIFEST:
await self._process_docker_v2_image(
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
await self._process_docker_v2_multiplatform_image(
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_OCI_INDEX:
await self._process_oci_index(
tg, sess, rqst_args, image, tag, resp_json
)
case _:
log.warn("Unknown content type: {}", content_type)
raise RuntimeError(
"The registry does not support the standard way of "
"listing multiarch images."
)

async def _read_manifest_list(
self,
sess: aiohttp.ClientSession,
manifest_list: Sequence[Any],
rqst_args: Mapping[str, Any],
image: str,
tag: str,
) -> None:
"""
Understands images defined under [OCI image manifest](https://github.com/opencontainers/image-spec/blob/main/manifest.md#example-image-manifest) or
[Docker image manifest list](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-manifest-list)
and imports Backend.AI compatible images.
"""
manifests = {}
for manifest in manifest_list:
platform_arg = f"{manifest["platform"]["os"]}/{manifest["platform"]["architecture"]}"
if variant := manifest["platform"].get("variant", None):
platform_arg += f"/{variant}"
architecture = manifest["platform"]["architecture"]
architecture = arch_name_aliases.get(architecture, architecture)

async with sess.get(
self.registry_url / f"v2/{image}/manifests/{manifest["digest"]}",
**rqst_args,
) as resp:
manifest_info = await resp.json()

manifests[architecture] = await self._preprocess_manifest(
sess, manifest_info, rqst_args, image
)

if not manifests[architecture]["labels"]:
log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture)

await self._read_manifest(image, tag, manifests)

async def _preprocess_manifest(
self,
sess: aiohttp.ClientSession,
manifest: Mapping[str, Any],
rqst_args: Mapping[str, Any],
image: str,
) -> dict[str, Any]:
"""
Extracts informations from
[Docker iamge manifest](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-image-manifest)
required by Backend.AI.
"""
config_digest = manifest["config"]["digest"]
size_bytes = sum(layer["size"] for layer in manifest["layers"]) + manifest["config"]["size"]

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args
) as resp:
resp.raise_for_status()
data = json.loads(await resp.read())
labels = {}

# we should favor `config` instead of `container_config` since `config` can contain additional datas
# set when commiting image via `--change` flag
if _config_labels := data.get("config", {}).get("Labels"):
labels = _config_labels
elif _container_config_labels := data.get("container_config", {}).get("Labels"):
labels = _container_config_labels

return {
"size": size_bytes,
"labels": labels,
"digest": config_digest,
}

async def _process_oci_index(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
manifest_list = [
item
for item in image_info["manifests"]
if "annotations" not in item # skip attestation manifests
]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_OCI_MANIFEST

await self._read_manifest_list(sess, manifest_list, rqst_args, image, tag)

async def _process_docker_v2_multiplatform_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
manifest_list = image_info["manifests"]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST

await self._read_manifest_list(
sess,
manifest_list,
rqst_args,
image,
tag,
)

async def _process_docker_v2_image(
self,
tg: aiotools.TaskGroup,
sess: aiohttp.ClientSession,
rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
config_digest = image_info["config"]["digest"]
rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST

async with sess.get(
self.registry_url / f"v2/{image}/blobs/{config_digest}",
**rqst_args,
) as resp:
resp.raise_for_status()
blob_data = json.loads(await resp.read())

manifest_arch = blob_data["architecture"]
architecture = arch_name_aliases.get(manifest_arch, manifest_arch)

manifests = {
architecture: await self._preprocess_manifest(sess, image_info, rqst_args, image),
}
await self._read_manifest(image, tag, manifests)

async def _read_manifest(
self,
Expand All @@ -345,6 +445,9 @@ async def _read_manifest(
manifests: dict[str, dict],
skip_reason: Optional[str] = None,
) -> None:
"""
Detects if image is compatible with Backend.AI and injects the matadata to database if it complies.
"""
if not manifests:
if not skip_reason:
skip_reason = "missing/deleted"
Expand Down
28 changes: 16 additions & 12 deletions src/ai/backend/manager/container_registry/harbor.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,15 +263,15 @@ async def _scan_image(
match image_info["manifest_media_type"]:
case self.MEDIA_TYPE_OCI_INDEX:
await self._process_oci_index(
tg, sess, rqst_args, image, image_info
tg, sess, rqst_args, image, tag, image_info
)
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
await self._process_docker_v2_multiplatform_image(
tg, sess, rqst_args, image, image_info
tg, sess, rqst_args, image, tag, image_info
)
case self.MEDIA_TYPE_DOCKER_MANIFEST:
await self._process_docker_v2_image(
tg, sess, rqst_args, image, image_info
tg, sess, rqst_args, image, tag, image_info
)
case _ as media_type:
raise RuntimeError(
Expand Down Expand Up @@ -312,15 +312,19 @@ async def _scan_tag(
resp.raise_for_status()
resp_json = await resp.json()
async with aiotools.TaskGroup() as tg:
tag = resp_json["tags"][0]["name"]

match resp_json["manifest_media_type"]:
case self.MEDIA_TYPE_OCI_INDEX:
await self._process_oci_index(tg, sess, rqst_args, image, resp_json)
await self._process_oci_index(tg, sess, rqst_args, image, tag, resp_json)
case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST:
await self._process_docker_v2_multiplatform_image(
tg, sess, rqst_args, image, resp_json
tg, sess, rqst_args, image, tag, resp_json
)
case self.MEDIA_TYPE_DOCKER_MANIFEST:
await self._process_docker_v2_image(tg, sess, rqst_args, image, resp_json)
await self._process_docker_v2_image(
tg, sess, rqst_args, image, tag, resp_json
)
case _ as media_type:
raise RuntimeError(f"Unsupported artifact media-type: {media_type}")

Expand All @@ -330,14 +334,14 @@ async def _process_oci_index(
sess: aiohttp.ClientSession,
_rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
rqst_args = dict(_rqst_args)
if not rqst_args.get("headers"):
rqst_args["headers"] = {}
rqst_args["headers"].update({"Accept": "application/vnd.oci.image.manifest.v1+json"})
digests: list[tuple[str, str]] = []
tag_name = image_info["tags"][0]["name"]
for reference in image_info["references"]:
if (
reference["platform"]["os"] == "unknown"
Expand All @@ -355,7 +359,7 @@ async def _process_oci_index(
rqst_args,
image,
digest=digest,
tag=tag_name,
tag=tag,
architecture=architecture,
)
)
Expand All @@ -366,6 +370,7 @@ async def _process_docker_v2_multiplatform_image(
sess: aiohttp.ClientSession,
_rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
rqst_args = dict(_rqst_args)
Expand All @@ -375,7 +380,6 @@ async def _process_docker_v2_multiplatform_image(
"Accept": "application/vnd.docker.distribution.manifest.v2+json"
})
digests: list[tuple[str, str]] = []
tag_name = image_info["tags"][0]["name"]
for reference in image_info["references"]:
if (
reference["platform"]["os"] == "unknown"
Expand All @@ -393,7 +397,7 @@ async def _process_docker_v2_multiplatform_image(
rqst_args,
image,
digest=digest,
tag=tag_name,
tag=tag,
architecture=architecture,
)
)
Expand All @@ -404,6 +408,7 @@ async def _process_docker_v2_image(
sess: aiohttp.ClientSession,
_rqst_args: Mapping[str, Any],
image: str,
tag: str,
image_info: Mapping[str, Any],
) -> None:
rqst_args = dict(_rqst_args)
Expand All @@ -414,14 +419,13 @@ async def _process_docker_v2_image(
})
if (reporter := progress_reporter.get()) is not None:
reporter.total_progress += 1
tag_name = image_info["tags"][0]["name"]
async with aiotools.TaskGroup() as tg:
tg.create_task(
self._harbor_scan_tag_single_arch(
sess,
rqst_args,
image,
tag=tag_name,
tag=tag,
)
)

Expand Down

0 comments on commit a4b9b3c

Please sign in to comment.