From 8c4d8c04c48a4e4410ee0ac35deaab96459c4cff Mon Sep 17 00:00:00 2001 From: Gyubong Lee Date: Sat, 3 Aug 2024 01:53:09 +0000 Subject: [PATCH] refactor: Base ContainerRegistry's `scan_tag` and implement `MEDIA_TYPE_DOCKER_MANIFEST` type handling --- .../manager/container_registry/base.py | 217 +++++++++++++----- .../manager/container_registry/harbor.py | 28 ++- 2 files changed, 176 insertions(+), 69 deletions(-) diff --git a/src/ai/backend/manager/container_registry/base.py b/src/ai/backend/manager/container_registry/base.py index 785012b92cd..f52caa88b0e 100644 --- a/src/ai/backend/manager/container_registry/base.py +++ b/src/ai/backend/manager/container_registry/base.py @@ -6,7 +6,7 @@ from abc import ABCMeta, abstractmethod from contextlib import asynccontextmanager as actxmgr from contextvars import ContextVar -from typing import Any, AsyncIterator, Dict, Final, Optional, cast +from typing import Any, AsyncIterator, Dict, Final, Mapping, Optional, Sequence, cast import aiohttp import aiotools @@ -233,7 +233,6 @@ async def _scan_tag( image: str, tag: str, ) -> None: - manifests = {} async with concurrency_sema.get(): rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST_LIST async with sess.get( @@ -246,62 +245,163 @@ async def _scan_tag( content_type = resp.headers["Content-Type"] resp.raise_for_status() resp_json = await resp.json() - match content_type: - case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: - manifest_list = resp_json["manifests"] - request_type = self.MEDIA_TYPE_DOCKER_MANIFEST - case self.MEDIA_TYPE_OCI_INDEX: - manifest_list = [ - item - for item in resp_json["manifests"] - if "annotations" not in item # skip attestation manifests - ] - request_type = self.MEDIA_TYPE_OCI_MANIFEST - case _: - log.warn("Unknown content type: {}", content_type) - raise RuntimeError( - "The registry does not support the standard way of " - "listing multiarch images." - ) - rqst_args["headers"]["Accept"] = request_type - for manifest in manifest_list: - platform_arg = ( - f"{manifest['platform']['os']}/{manifest['platform']['architecture']}" - ) - if variant := manifest["platform"].get("variant", None): - platform_arg += f"/{variant}" - architecture = manifest["platform"]["architecture"] - architecture = arch_name_aliases.get(architecture, architecture) - async with sess.get( - self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", **rqst_args - ) as resp: - data = await resp.json() - config_digest = data["config"]["digest"] - size_bytes = sum(layer["size"] for layer in data["layers"]) + data["config"]["size"] - async with sess.get( - self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args - ) as resp: - resp.raise_for_status() - data = json.loads(await resp.read()) - labels = {} - # we should favor `config` instead of `container_config` since `config` can contain additional datas - # set when commiting image via `--change` flag - if _config_labels := data.get("config", {}).get("Labels"): - labels = _config_labels - elif _container_config_labels := data.get("container_config", {}).get("Labels"): - labels = _container_config_labels - - if not labels: - log.warning( - "Labels section not found on image {}:{}/{}", image, tag, architecture - ) - manifests[architecture] = { - "size": size_bytes, - "labels": labels, - "digest": config_digest, - } - await self._read_manifest(image, tag, manifests) + async with aiotools.TaskGroup() as tg: + match content_type: + case self.MEDIA_TYPE_DOCKER_MANIFEST: + await self._process_docker_v2_image( + tg, sess, rqst_args, image, tag, resp_json + ) + case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: + await self._process_docker_v2_multiplatform_image( + tg, sess, rqst_args, image, tag, resp_json + ) + case self.MEDIA_TYPE_OCI_INDEX: + await self._process_oci_index( + tg, sess, rqst_args, image, tag, resp_json + ) + case _: + log.warn("Unknown content type: {}", content_type) + raise RuntimeError( + "The registry does not support the standard way of " + "listing multiarch images." + ) + + async def _read_manifest_list( + self, + sess: aiohttp.ClientSession, + manifest_list: Sequence[Any], + rqst_args: Mapping[str, Any], + image: str, + tag: str, + ) -> None: + """ + Understands images defined under [OCI image manifest](https://github.com/opencontainers/image-spec/blob/main/manifest.md#example-image-manifest) or + [Docker image manifest list](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-manifest-list) + and imports Backend.AI compatible images. + """ + manifests = {} + for manifest in manifest_list: + platform_arg = f"{manifest['platform']['os']}/{manifest['platform']['architecture']}" + if variant := manifest["platform"].get("variant", None): + platform_arg += f"/{variant}" + architecture = manifest["platform"]["architecture"] + architecture = arch_name_aliases.get(architecture, architecture) + + async with sess.get( + self.registry_url / f"v2/{image}/manifests/{manifest['digest']}", + **rqst_args, + ) as resp: + manifest_info = await resp.json() + + manifests[architecture] = await self._preprocess_manifest( + sess, manifest_info, rqst_args, image + ) + + if not manifests[architecture]["labels"]: + log.warning("Labels section not found on image {}:{}/{}", image, tag, architecture) + + await self._read_manifest(image, tag, manifests) + + async def _preprocess_manifest( + self, + sess: aiohttp.ClientSession, + manifest: Mapping[str, Any], + rqst_args: Mapping[str, Any], + image: str, + ) -> dict[str, Any]: + """ + Extracts informations from + [Docker iamge manifest](https://github.com/openshift/docker-distribution/blob/master/docs/spec/manifest-v2-2.md#example-image-manifest) + required by Backend.AI. + """ + config_digest = manifest["config"]["digest"] + size_bytes = sum(layer["size"] for layer in manifest["layers"]) + manifest["config"]["size"] + + async with sess.get( + self.registry_url / f"v2/{image}/blobs/{config_digest}", **rqst_args + ) as resp: + resp.raise_for_status() + data = json.loads(await resp.read()) + labels = {} + + # we should favor `config` instead of `container_config` since `config` can contain additional datas + # set when commiting image via `--change` flag + if _config_labels := data.get("config", {}).get("Labels"): + labels = _config_labels + elif _container_config_labels := data.get("container_config", {}).get("Labels"): + labels = _container_config_labels + + return { + "size": size_bytes, + "labels": labels, + "digest": config_digest, + } + + async def _process_oci_index( + self, + tg: aiotools.TaskGroup, + sess: aiohttp.ClientSession, + rqst_args: Mapping[str, Any], + image: str, + tag: str, + image_info: Mapping[str, Any], + ) -> None: + manifest_list = [ + item + for item in image_info["manifests"] + if "annotations" not in item # skip attestation manifests + ] + rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_OCI_MANIFEST + + await self._read_manifest_list(sess, manifest_list, rqst_args, image, tag) + + async def _process_docker_v2_multiplatform_image( + self, + tg: aiotools.TaskGroup, + sess: aiohttp.ClientSession, + rqst_args: Mapping[str, Any], + image: str, + tag: str, + image_info: Mapping[str, Any], + ) -> None: + manifest_list = image_info["manifests"] + rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST + + await self._read_manifest_list( + sess, + manifest_list, + rqst_args, + image, + tag, + ) + + async def _process_docker_v2_image( + self, + tg: aiotools.TaskGroup, + sess: aiohttp.ClientSession, + rqst_args: Mapping[str, Any], + image: str, + tag: str, + image_info: Mapping[str, Any], + ) -> None: + config_digest = image_info["config"]["digest"] + rqst_args["headers"]["Accept"] = self.MEDIA_TYPE_DOCKER_MANIFEST + + async with sess.get( + self.registry_url / f"v2/{image}/blobs/{config_digest}", + **rqst_args, + ) as resp: + resp.raise_for_status() + blob_data = json.loads(await resp.read()) + + manifest_arch = blob_data["architecture"] + architecture = arch_name_aliases.get(manifest_arch, manifest_arch) + + manifests = { + architecture: await self._preprocess_manifest(sess, image_info, rqst_args, image), + } + await self._read_manifest(image, tag, manifests) async def _read_manifest( self, @@ -310,6 +410,9 @@ async def _read_manifest( manifests: dict[str, dict], skip_reason: Optional[str] = None, ) -> None: + """ + Detects if image is compatible with Backend.AI and injects the matadata to database if it complies. + """ if not manifests: if not skip_reason: skip_reason = "missing/deleted" diff --git a/src/ai/backend/manager/container_registry/harbor.py b/src/ai/backend/manager/container_registry/harbor.py index fa4bd9a4085..3eb4ba82b58 100644 --- a/src/ai/backend/manager/container_registry/harbor.py +++ b/src/ai/backend/manager/container_registry/harbor.py @@ -263,15 +263,15 @@ async def _scan_image( match image_info["manifest_media_type"]: case self.MEDIA_TYPE_OCI_INDEX: await self._process_oci_index( - tg, sess, rqst_args, image, image_info + tg, sess, rqst_args, image, tag, image_info ) case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: await self._process_docker_v2_multiplatform_image( - tg, sess, rqst_args, image, image_info + tg, sess, rqst_args, image, tag, image_info ) case self.MEDIA_TYPE_DOCKER_MANIFEST: await self._process_docker_v2_image( - tg, sess, rqst_args, image, image_info + tg, sess, rqst_args, image, tag, image_info ) case _ as media_type: raise RuntimeError( @@ -312,15 +312,19 @@ async def _scan_tag( resp.raise_for_status() resp_json = await resp.json() async with aiotools.TaskGroup() as tg: + tag = resp_json["tags"][0]["name"] + match resp_json["manifest_media_type"]: case self.MEDIA_TYPE_OCI_INDEX: - await self._process_oci_index(tg, sess, rqst_args, image, resp_json) + await self._process_oci_index(tg, sess, rqst_args, image, tag, resp_json) case self.MEDIA_TYPE_DOCKER_MANIFEST_LIST: await self._process_docker_v2_multiplatform_image( - tg, sess, rqst_args, image, resp_json + tg, sess, rqst_args, image, tag, resp_json ) case self.MEDIA_TYPE_DOCKER_MANIFEST: - await self._process_docker_v2_image(tg, sess, rqst_args, image, resp_json) + await self._process_docker_v2_image( + tg, sess, rqst_args, image, tag, resp_json + ) case _ as media_type: raise RuntimeError(f"Unsupported artifact media-type: {media_type}") @@ -330,6 +334,7 @@ async def _process_oci_index( sess: aiohttp.ClientSession, _rqst_args: Mapping[str, Any], image: str, + tag: str, image_info: Mapping[str, Any], ) -> None: rqst_args = dict(_rqst_args) @@ -337,7 +342,6 @@ async def _process_oci_index( rqst_args["headers"] = {} rqst_args["headers"].update({"Accept": "application/vnd.oci.image.manifest.v1+json"}) digests: list[tuple[str, str]] = [] - tag_name = image_info["tags"][0]["name"] for reference in image_info["references"]: if ( reference["platform"]["os"] == "unknown" @@ -355,7 +359,7 @@ async def _process_oci_index( rqst_args, image, digest=digest, - tag=tag_name, + tag=tag, architecture=architecture, ) ) @@ -366,6 +370,7 @@ async def _process_docker_v2_multiplatform_image( sess: aiohttp.ClientSession, _rqst_args: Mapping[str, Any], image: str, + tag: str, image_info: Mapping[str, Any], ) -> None: rqst_args = dict(_rqst_args) @@ -375,7 +380,6 @@ async def _process_docker_v2_multiplatform_image( "Accept": "application/vnd.docker.distribution.manifest.v2+json" }) digests: list[tuple[str, str]] = [] - tag_name = image_info["tags"][0]["name"] for reference in image_info["references"]: if ( reference["platform"]["os"] == "unknown" @@ -393,7 +397,7 @@ async def _process_docker_v2_multiplatform_image( rqst_args, image, digest=digest, - tag=tag_name, + tag=tag, architecture=architecture, ) ) @@ -404,6 +408,7 @@ async def _process_docker_v2_image( sess: aiohttp.ClientSession, _rqst_args: Mapping[str, Any], image: str, + tag: str, image_info: Mapping[str, Any], ) -> None: rqst_args = dict(_rqst_args) @@ -414,14 +419,13 @@ async def _process_docker_v2_image( }) if (reporter := progress_reporter.get()) is not None: reporter.total_progress += 1 - tag_name = image_info["tags"][0]["name"] async with aiotools.TaskGroup() as tg: tg.create_task( self._harbor_scan_tag_single_arch( sess, rqst_args, image, - tag=tag_name, + tag=tag, ) )