diff --git a/cnudie/iter.py b/cnudie/iter.py index f736c18aa3..cd0f114484 100644 --- a/cnudie/iter.py +++ b/cnudie/iter.py @@ -249,3 +249,169 @@ def iter_resources( component_filter=component_filter, reftype_filter=reftype_filter, ) + + +async def iter_async( + component: ocm.Component, + lookup: cnudie.retrieve.ComponentDescriptorLookupById=None, + recursion_depth: int=-1, + prune_unique: bool=True, + node_filter: collections.abc.Callable[[Node], bool]=None, + ocm_repo: ocm.OcmRepository | str=None, + ctx_repo: ocm.OcmRepository | str=None, # deprecated, use `ocm_repo` instead + component_filter: collections.abc.Callable[[ocm.Component], bool]=None, + reftype_filter: collections.abc.Callable[[NodeReferenceType], bool]=None, +) -> collections.abc.AsyncGenerator[Node, None, None]: + ''' + returns a generator yielding the transitive closure of nodes accessible from the given component. + + See `cnudie.retrieve` for retrieving components/component descriptors. + + @param component: root component for iteration + @param lookup: used to lookup referenced components descriptors + (thus abstracting from retrieval method) + optional iff recursion_depth is set to 0 + @param recursion_depth: if set to a positive value, limit recursion for resolving component + dependencies; -1 will resolve w/o recursion limit, 0 will not resolve + component dependencies + @param prune_unique: if true, redundant component-versions will only be traversed once + @param node_filter: use to filter emitted nodes (see Filter for predefined filters) + @param ocm_repo: optional OCM Repository to be used to override in the lookup + @param ctx_repo: deprecated, use `ocm_repo` instead + @param component_filter: use to exclude components (and their references) from the iterator; + thereby `True` means the component should be filtered out + @param reftype_filter: use to exclude components (and their references) from the iterator if + they are of a certain reference type; thereby `True` means the component + should be filtered out + ''' + if not ocm_repo and ctx_repo: + ocm_repo = ctx_repo + + if isinstance(component, ocm.ComponentDescriptor): + component = component.component + + seen_component_ids = set() + + if not lookup and not recursion_depth == 0: + raise ValueError('lookup is required if recusion is not disabled (recursion_depth==0)') + + # need to nest actual iterator to keep global state of seen component-IDs + async def inner_iter_async( + component: ocm.Component, + lookup: cnudie.retrieve.ComponentDescriptorLookupById, + recursion_depth, + path: tuple[NodePathEntry]=(), + reftype: NodeReferenceType=NodeReferenceType.COMPONENT_REFERENCE, + ): + if component_filter and component_filter(component): + return + + if reftype_filter and reftype_filter(reftype): + return + + path = (*path, NodePathEntry(component, reftype)) + + yield ComponentNode( + path=path, + ) + + for resource in component.resources: + yield ResourceNode( + path=path, + resource=resource, + ) + + for source in component.sources: + yield SourceNode( + path=path, + source=source, + ) + + if recursion_depth == 0: + return # stop resolving referenced components + elif recursion_depth > 0: + recursion_depth -= 1 + + for cref in component.componentReferences: + cref_id = ocm.ComponentIdentity( + name=cref.componentName, + version=cref.version, + ) + + if ocm_repo: + referenced_component_descriptor = await lookup(cref_id, ocm_repo) + else: + referenced_component_descriptor = await lookup(cref_id) + + async for node in inner_iter_async( + component=referenced_component_descriptor.component, + lookup=lookup, + recursion_depth=recursion_depth, + path=path, + ): + yield node + + if not (extra_crefs_label := component.find_label( + name=dso.labels.ExtraComponentReferencesLabel.name, + )): + return + + extra_crefs_label: dso.labels.ExtraComponentReferencesLabel = dso.labels.deserialise_label( + label=extra_crefs_label, + ) + + for extra_cref in extra_crefs_label.value: + extra_cref_id = extra_cref.component_reference + + if ocm_repo: + referenced_component_descriptor = await lookup(extra_cref_id, ocm_repo) + else: + referenced_component_descriptor = await lookup(extra_cref_id) + + async for node in inner_iter_async( + component=referenced_component_descriptor.component, + lookup=lookup, + recursion_depth=recursion_depth, + path=path, + reftype=NodeReferenceType.EXTRA_COMPONENT_REFS_LABEL, + ): + yield node + + async for node in inner_iter_async( + component=component, + lookup=lookup, + recursion_depth=recursion_depth, + path=(), + ): + if node_filter and not node_filter(node): + continue + + if prune_unique and isinstance(node, ComponentNode): + if node.component.identity() in seen_component_ids: + continue + else: + seen_component_ids.add(node.component_id) + + yield node + + +def iter_resources_async( + component: ocm.Component, + lookup: cnudie.retrieve.ComponentDescriptorLookupById=None, + recursion_depth: int=-1, + prune_unique: bool=True, + component_filter: collections.abc.Callable[[ocm.Component], bool]=None, + reftype_filter: collections.abc.Callable[[NodeReferenceType], bool]=None, +) -> collections.abc.AsyncGenerator[ResourceNode, None, None]: + ''' + curried version of `iter_async` w/ node-filter preset to yield only resource-nodes + ''' + return iter_async( + component=component, + lookup=lookup, + recursion_depth=recursion_depth, + prune_unique=prune_unique, + node_filter=Filter.resources, + component_filter=component_filter, + reftype_filter=reftype_filter, + ) diff --git a/cnudie/purge.py b/cnudie/purge.py index f449369946..0748f16146 100644 --- a/cnudie/purge.py +++ b/cnudie/purge.py @@ -38,6 +38,31 @@ def iter_componentversions_to_purge( ) +async def iter_componentversions_to_purge_async( + component: ocm.Component | ocm.ComponentDescriptor, + policy: version.VersionRetentionPolicies, + oci_client: oc.AsyncClient=None, +) -> collections.abc.AsyncGenerator[ocm.ComponentIdentity, None, None]: + oci_ref = cnudie.util.oci_ref(component=component) + if isinstance(component, ocm.ComponentDescriptor): + component = component.component + + if not oci_client: + oci_client = ccc.oci.oci_client_async() + + versions = await oci_client.tags(oci_ref.ref_without_tag) + + for v in version.versions_to_purge( + versions=versions, + reference_version=component.version, + policy=policy, + ): + yield ocm.ComponentIdentity( + name=component.name, + version=v, + ) + + def remove_component_descriptor_and_referenced_artefacts( component: ocm.Component | ocm.ComponentDescriptor, oci_client: oc.Client=None, @@ -190,3 +215,157 @@ def iter_platform_refs(): ) return True + + +async def remove_component_descriptor_and_referenced_artefacts_async( + component: ocm.Component | ocm.ComponentDescriptor, + oci_client: oc.AsyncClient=None, + lookup: cnudie.retrieve.ComponentDescriptorLookupById=None, + recursive: bool=False, + on_error: str='abort', # todo: implement, e.g. patch-component-descriptor-and-abort +): + if isinstance(component, ocm.ComponentDescriptor): + component = component.component + + logger.info(f'will try to purge {component.name}:{component.version} including local resources') + + current_component = None + resources_with_removal_errors = [] + if not oci_client: + oci_client = ccc.oci.oci_client_async() + + async for node in cnudie.iter.iter_async( + component=component, + lookup=lookup, + recursion_depth=-1 if recursive else 0, + ): + # cnudie.iter.iter_async will return sequences of: + # - component-node (always exactly one per component) + # - resource-nodes (if any) + # - source-nodes (if any) + if isinstance(node, cnudie.iter.ComponentNode): + if current_component: # skip for first iteration + await _remove_component_descriptor_async( + component=current_component, + oci_client=oci_client, + ) + current_component = node.component + continue + + if isinstance(node, cnudie.iter.SourceNode): + continue # we ignore source-nodes for now + + if isinstance(node, cnudie.iter.ResourceNode): + if not node.resource.relation is ocm.ResourceRelation.LOCAL: + logger.debug(f'skipping non-local {node.resource.name=}') + continue + try: + did_remove = await _remove_resource_async( + node=node, + oci_client=oci_client, + ) + if not did_remove: + logger.info(f'do not know how to remove {node.resource=}') + except Exception as e: + logger.warning(f'error while trying to remove {node.resource=} - {e=}') + traceback.print_exc() + resources_with_removal_errors.append(node) + if on_error == 'abort': + logger.fatal('error encountered - aborting comoponent-descriptor-removal') + raise e + else: + raise ValueError(f'unknown value {on_error=}') + + # remove final component (last component-component-descriptor would otherwise not be removed, + # as we remove component-descriptors only after (trying to) remove referenced resources. + if current_component: + await _remove_component_descriptor_async( + component=component, + oci_client=oci_client, + ) + + +async def _remove_component_descriptor_async( + component: ocm.Component, + oci_client: oc.AsyncClient, +): + oci_ref = cnudie.util.oci_ref( + component=component, + ) + + await oci_client.delete_manifest( + image_reference=oci_ref, + purge=True, + ) + + +async def _remove_resource_async( + node: cnudie.iter.ResourceNode, + oci_client: oc.AsyncClient, +) -> bool: + resource = node.resource + if not resource.type in (ocm.ArtefactType.OCI_IMAGE, 'ociImage'): + return False # we only support removal of oci-images for now + + if not resource.relation in (ocm.ResourceRelation.LOCAL, 'local'): + return False # external resources can never be removed (as we do not "own" them) + + if not isinstance(resource.access, ocm.OciAccess): + return False # similar to above: we only support removal of oci-images in oci-registries + + access: ocm.OciAccess = resource.access + image_reference = om.OciImageReference(access.imageReference) + + manifest = await oci_client.manifest( + image_reference=image_reference, + absent_ok=True, + accept=om.MimeTypes.prefer_multiarch, + ) + + if not manifest: + return True # nothing to do if image does not exist + + if image_reference.has_symbolical_tag: + purge = True + elif image_reference.has_digest_tag: + purge = False # no need to "purge" if we were passed a digest-tag + else: + raise ValueError(f'cannot remove image w/o tag: {str(image_reference)}') + + await oci_client.delete_manifest( + image_reference=image_reference, + purge=purge, + accept=om.MimeTypes.prefer_multiarch, + ) + + if isinstance(manifest, om.OciImageManifest): + return True + + if not isinstance(manifest, om.OciImageManifestList): + raise ValueError(f'did not expect type {manifest=} {type(manifest)} - this is a bug') + + # multi-arch-case - try to guess other tags, and purge those + manifest: om.OciImageManifestList + + def iter_platform_refs(): + repository = image_reference.ref_without_tag + base_tag = image_reference.tag + + for submanifest in manifest.manifests: + p = submanifest.platform + yield f'{repository}:{base_tag}-{p.os}-{p.architecture}' + + for ref in iter_platform_refs(): + if not await oci_client.head_manifest( + image_reference=ref, + absent_ok=True, + ): + logger.warning(f'did not find {ref=} - ignoring') + continue + + await oci_client.delete_manifest( + image_reference=ref, + purge=True, + ) + + return True diff --git a/cnudie/retrieve.py b/cnudie/retrieve.py index cc199a6b7e..849381460f 100644 --- a/cnudie/retrieve.py +++ b/cnudie/retrieve.py @@ -9,6 +9,8 @@ import tarfile import tempfile +import aiohttp +import aiohttp.client_exceptions import requests import yaml @@ -193,6 +195,69 @@ def lookup( return lookup +def in_memory_cache_component_descriptor_lookup_async( + cache_ctor: cachetools.Cache=cachetools.LRUCache, + ocm_repository_lookup: OcmRepositoryLookup=None, + **cache_kwargs, +) -> ComponentDescriptorLookupById: + ''' + Used to lookup referenced component descriptors in the in-memory cache. + In case of a cache miss, the required component descriptor can be added + to the cache by using the writeback function. + + @param cache_ctor: + specification of the cache implementation + @param ocm_repository_lookup: + lookup for OCM repositories + @param cache_kwargs: + further args used for cache initialization, maxsize is defaulted to 2048 + ''' + cache_kwargs['maxsize'] = cache_kwargs.get('maxsize', 2048) + cache = cache_ctor(**cache_kwargs) + + def writeback( + component_id: ocm.ComponentIdentity, + component_descriptor: ocm.ComponentDescriptor, + ): + if (ocm_repo := component_descriptor.component.current_ocm_repo): + cache.__setitem__((component_id, ocm_repo), component_descriptor) + else: + raise ValueError(ocm_repo) + + _writeback = WriteBack(writeback) + + async def lookup( + component_id: ocm.ComponentIdentity, + ctx_repo: ocm.OcmRepository=None, + ocm_repository_lookup=ocm_repository_lookup, + ): + if ctx_repo: + ocm_repos = (ctx_repo,) + + else: + ocm_repos = iter_ocm_repositories( + component_id, + ocm_repository_lookup, + ) + + for ocm_repo in ocm_repos: + if isinstance(ocm_repo, str): + ocm_repo = ocm.OciOcmRepository( + type=ocm.AccessType.OCI_REGISTRY, + baseUrl=ocm_repo, + ) + try: + if (component_descriptor := cache.get((component_id, ocm_repo))): + return component_descriptor + except KeyError: + pass + + # component descriptor not found in lookup + return _writeback + + return lookup + + def file_system_cache_component_descriptor_lookup( ocm_repository_lookup: OcmRepositoryLookup=None, cache_dir: str=None, @@ -288,6 +353,101 @@ def lookup( return lookup +def file_system_cache_component_descriptor_lookup_async( + ocm_repository_lookup: OcmRepositoryLookup=None, + cache_dir: str=None, +) -> ComponentDescriptorLookupById: + ''' + Used to lookup referenced component descriptors in the file-system cache. + In case of a cache miss, the required component descriptor can be added + to the cache by using the writeback function. If cache_dir is not specified, + it is tried to retrieve it from configuration (see `ctx`). + + @param ocm_repository_lookup: + lookup for OCM repositories + @param cache_dir: + directory used for caching. Must exist, otherwise a ValueError is raised + ''' + if not cache_dir: + raise ValueError(cache_dir) + + def writeback( + component_id: ocm.ComponentIdentity, + component_descriptor: ocm.ComponentDescriptor, + ): + if not (ocm_repo := component_descriptor.component.current_ocm_repo): + raise ValueError(ocm_repo) + + try: + f = tempfile.NamedTemporaryFile(mode='w', delete=False) + # write to tempfile, followed by a mv to avoid collisions through concurrent + # processes or threads (assuming mv is an atomic operation) + yaml.dump( + data=dataclasses.asdict(component_descriptor), + Dumper=ocm.EnumValueYamlDumper, + stream=f.file, + ) + f.close() # need to close filehandle for NT + + descriptor_path = os.path.join( + cache_dir, + ocm_repo.oci_ref.replace('/', '-'), + f'{component_id.name}-{component_id.version}', + ) + if not os.path.isfile(descriptor_path): + base_dir = os.path.dirname(descriptor_path) + os.makedirs(name=base_dir, exist_ok=True) + shutil.move(f.name, descriptor_path) + except: + os.unlink(f.name) + raise + + _writeback = WriteBack(writeback) + + async def lookup( + component_id: cnudie.util.ComponentId, + ctx_repo: ocm.OcmRepository|str=None, + ocm_repository_lookup: OcmRepositoryLookup=ocm_repository_lookup, + ): + if ctx_repo: + ocm_repos = (ctx_repo, ) + else: + ocm_repos = iter_ocm_repositories( + component_id, + ocm_repository_lookup, + ) + + for ocm_repo in ocm_repos: + if not ocm_repo: + raise ValueError(ocm_repo) + + if isinstance(ocm_repo, str): + ocm_repo = ocm.OciOcmRepository( + type=ocm.AccessType.OCI_REGISTRY, + baseUrl=ocm_repo, + ) + + if not isinstance(ocm_repo, ocm.OciOcmRepository): + raise NotImplementedError(ocm_repo) + + component_id = cnudie.util.to_component_id(component_id) + + descriptor_path = os.path.join( + cache_dir, + ocm_repo.oci_ref.replace('/', '-'), + f'{component_id.name}-{component_id.version}', + ) + if os.path.isfile(descriptor_path): + return ocm.ComponentDescriptor.from_dict( + ci.util.parse_yaml_file(descriptor_path) + ) + + # component descriptor not found in lookup + return _writeback + + return lookup + + def delivery_service_component_descriptor_lookup( ocm_repository_lookup: OcmRepositoryLookup=None, delivery_client=None, @@ -383,6 +543,101 @@ def lookup( return lookup +def delivery_service_component_descriptor_lookup_async( + ocm_repository_lookup: OcmRepositoryLookup=None, + delivery_client=None, + default_absent_ok: bool=True, + default_ignore_errors: tuple[Exception]=( + requests.exceptions.ConnectionError, + requests.exceptions.ReadTimeout, + ), + fallback_to_service_mapping: bool=True, +) -> ComponentDescriptorLookupById: + ''' + Used to lookup referenced component descriptors in the delivery-service. + + @param ocm_repository_lookup: + lookup for OCM repositories + @param delivery_client: + client to establish the connection to the delivery-service. If the client cannot be created, + a ValueError is raised + @param default_absent_ok: + sets the default behaviour in case of absent component descriptors for the returned lookup + function + @param default_ignore_errors: + collection of exceptions which should be ignored by default. In case of such an exception, + no component descriptor will be returned, so that a subsequent lookup can retry retrieving + it + @param fallback_to_service_mapping: + if set, it is tried to retrieve the requested component descriptor using the OCM repository + mapping of the delivery-service, in case it could not be retrieved using + `ocm_repository_lookup` + ''' + if not delivery_client: + import ccc.delivery + delivery_client = ccc.delivery.default_client_if_available() + if not delivery_client: + raise ValueError(delivery_client) + + async def lookup( + component_id: ocm.ComponentIdentity, + ctx_repo: ocm.OcmRepository=None, + ocm_repository_lookup: OcmRepositoryLookup=ocm_repository_lookup, + absent_ok: bool=default_absent_ok, + ignore_errors: tuple[Exception]=default_ignore_errors, + ): + component_id = cnudie.util.to_component_id(component_id) + if ctx_repo: + ocm_repos = (ctx_repo, ) + else: + ocm_repos = iter_ocm_repositories( + component_id, + ocm_repository_lookup, + ) + + # if component descriptor is not found in `ocm_repos`, fallback to default ocm repo mapping + # defined in delivery service (i.e. specify no ocm repository) + if fallback_to_service_mapping: + ocm_repos = itertools.chain(ocm_repos, (None,)) + + for ocm_repo in ocm_repos: + if isinstance(ocm_repo, str): + ocm_repo = ocm.OciOcmRepository( + type=ocm.AccessType.OCI_REGISTRY, + baseUrl=ocm_repo, + ) + + if ocm_repo and not isinstance(ocm_repo, ocm.OciOcmRepository): + raise NotImplementedError(ocm_repo) + + try: + component_descriptor = delivery_client.component_descriptor( + name=component_id.name, + version=component_id.version, + ocm_repo_url=ocm_repo.oci_ref if ocm_repo else None, + ) + + if component_descriptor: + return component_descriptor + except requests.exceptions.HTTPError as e: + if e.response.status_code == 404: + continue + elif e.response.status_code >= 500: + # in case delivery-service is not reachable, fallback to next lookup (if any) + return None + raise + except ignore_errors: + # already return here to not use unintended "fallback" ocm repositories + return None + + # component descriptor not found in lookup + if absent_ok: + return None + raise om.OciImageNotFoundException + + return lookup + + def raw_component_descriptor_from_oci( component_id: ocm.ComponentIdentity, ocm_repos: collections.abc.Iterable[ocm.OciOcmRepository | str], @@ -462,6 +717,86 @@ def raw_component_descriptor_from_oci( ).content +async def raw_component_descriptor_from_oci_async( + component_id: ocm.ComponentIdentity, + ocm_repos: collections.abc.Iterable[ocm.OciOcmRepository | str], + oci_client: oc.AsyncClient=None, + absent_ok: bool=False, +) -> bytes: + if not oci_client: + import ccc.oci + if not (oci_client := ccc.oci.oci_client_async()): + raise ValueError('oci_client must not be empty') + + for ocm_repo in ocm_repos: + if isinstance(ocm_repo, str): + ocm_repo = ocm.OciOcmRepository( + type=ocm.OciAccess, + baseUrl=ocm_repo, + ) + + if not isinstance(ocm_repo, ocm.OciOcmRepository): + raise NotImplementedError(ocm_repo) + + target_ref = ci.util.urljoin( + ocm_repo.oci_ref, + 'component-descriptors', + f'{component_id.name.lower()}:{component_id.version}', # oci-spec allows only lowercase + ) + + manifest = await oci_client.manifest( + image_reference=target_ref, + absent_ok=True, + ) + + if manifest: + break + else: + manifest = None + + if not manifest and absent_ok: + return None + elif not manifest: + raise om.OciImageNotFoundException + + try: + cfg_blob = await oci_client.blob( + image_reference=target_ref, + digest=manifest.config.digest, + ) + cfg_dict = await cfg_blob.json( + content_type='application/octet-stream' + ) + cfg = dacite.from_dict( + data_class=gci.oci.ComponentDescriptorOciCfg, + data=cfg_dict, + ) + layer_digest = cfg.componentDescriptorLayer.digest + layer_mimetype = cfg.componentDescriptorLayer.mediaType + except Exception as e: + logger.warning( + f'Failed to parse or retrieve component-descriptor-cfg: {e=}. ' + 'falling back to single-layer' + ) + + # by contract, there must be exactly one layer (tar w/ component-descriptor) + if not (layers_count := len(manifest.layers) == 1): + logger.warning(f'XXX unexpected amount of {layers_count=}') + + layer_digest = manifest.layers[0].digest + layer_mimetype = manifest.layers[0].mediaType + + if not layer_mimetype in gci.oci.component_descriptor_mimetypes: + logger.warning(f'{target_ref=} {layer_mimetype=} was unexpected') + # XXX: check for non-tar-variant + + blob = await oci_client.blob( + image_reference=target_ref, + digest=layer_digest, + ) + return await blob.content.read() + + def oci_component_descriptor_lookup( ocm_repository_lookup: OcmRepositoryLookup=None, oci_client: oc.Client | collections.abc.Callable[[], oc.Client]=None, @@ -485,7 +820,80 @@ def oci_component_descriptor_lookup( if not oci_client: raise ValueError(oci_client) - def lookup( + def lookup( + component_id: ocm.ComponentIdentity, + ctx_repo: ocm.OcmRepository=None, + ocm_repository_lookup: OcmRepositoryLookup=ocm_repository_lookup, + absent_ok=default_absent_ok, + ): + component_id = cnudie.util.to_component_id(component_id) + + if isinstance(oci_client, collections.abc.Callable): + local_oci_client = oci_client() + else: + local_oci_client = oci_client + + if ctx_repo: + ocm_repos = (ctx_repo,) + else: + if not ocm_repository_lookup: + raise ValueError('either ctx_repo, or ocm_repository_lookup must be passed') + + ocm_repos = iter_ocm_repositories( + component_id, + ocm_repository_lookup, + ) + + raw = raw_component_descriptor_from_oci( + component_id=component_id, + ocm_repos=ocm_repos, + oci_client=local_oci_client, + absent_ok=absent_ok, + ) + if not raw and absent_ok: + return + elif not raw and not absent_ok: + raise om.OciImageNotFoundException(component_id) + + # wrap in fobj + blob_fobj = io.BytesIO(raw) + try: + component_descriptor = gci.oci.component_descriptor_from_tarfileobj( + fileobj=blob_fobj, + ) + except tarfile.ReadError as tre: + tre.add_note(f'{component_id=}') + raise tre + + return component_descriptor + + return lookup + + +def oci_component_descriptor_lookup_async( + ocm_repository_lookup: OcmRepositoryLookup=None, + oci_client: oc.AsyncClient | collections.abc.Callable[[], oc.AsyncClient]=None, + default_absent_ok=True, +) -> ComponentDescriptorLookupById: + ''' + Used to lookup referenced component descriptors in the oci-registry. + + @param ocm_repository_lookup: + lookup for OCM repositories + @param oci_client: + client to establish the connection to the oci-registry. If the client cannot be created, a + ValueError is raised + @param default_absent_ok: + sets the default behaviour in case of absent component descriptors for the returned lookup + function + ''' + if not oci_client: + import ccc.oci + oci_client = ccc.oci.oci_client_async() + if not oci_client: + raise ValueError(oci_client) + + async def lookup( component_id: ocm.ComponentIdentity, ctx_repo: ocm.OcmRepository=None, ocm_repository_lookup: OcmRepositoryLookup=ocm_repository_lookup, @@ -509,7 +917,7 @@ def lookup( ocm_repository_lookup, ) - raw = raw_component_descriptor_from_oci( + raw = await raw_component_descriptor_from_oci_async( component_id=component_id, ocm_repos=ocm_repos, oci_client=local_oci_client, @@ -614,6 +1022,67 @@ def lookup( return lookup +def version_lookup_async( + ocm_repository_lookup: OcmRepositoryLookup=None, + oci_client: oc.AsyncClient=None, + default_absent_ok: bool=True, +) -> VersionLookupByComponent: + if not oci_client: + import ccc.oci + oci_client = ccc.oci.oci_client_async() + if not oci_client: + raise ValueError(oci_client) + + async def lookup( + component_id: ComponentName, + ctx_repo: ocm.OcmRepository=None, + ocm_repository_lookup: OcmRepositoryLookup=ocm_repository_lookup, + absent_ok: bool=default_absent_ok, + ): + component_name = cnudie.util.to_component_name(component_id) + if ctx_repo: + ocm_repos = (ctx_repo, ) + else: + ocm_repos = iter_ocm_repositories( + component_name, + ocm_repository_lookup, + ) + + versions = set() + for ocm_repo in ocm_repos: + if isinstance(ocm_repo, str): + ocm_repo = ocm.OciOcmRepository( + type=ocm.OciAccess, + baseUrl=ocm_repo, + ) + if not isinstance(ocm_repo, ocm.OciOcmRepository): + raise NotImplementedError(ocm_repo) + + try: + for version_tag in await component_versions_async( + component_name=component_name, + ctx_repo=ocm_repo, + oci_client=oci_client, + ): + versions.add(version_tag) + except aiohttp.client_exceptions.ClientResponseError as e: + if (error_code := e.status) == 404: + continue + + image_reference = ocm_repo.component_oci_ref(component_name) + if error_code == error_code_indicating_not_found(image_reference=image_reference): + continue + + raise + + if not versions and not absent_ok: + raise om.OciImageNotFoundException() + + return versions + + return lookup + + def composite_component_descriptor_lookup( lookups: tuple[ComponentDescriptorLookupById, ...], ocm_repository_lookup: OcmRepositoryLookup | None=None, @@ -702,6 +1171,94 @@ def to_repo_url(ocm_repo): return lookup +def composite_component_descriptor_lookup_async( + lookups: tuple[ComponentDescriptorLookupById, ...], + ocm_repository_lookup: OcmRepositoryLookup | None=None, + default_absent_ok=True, +) -> ComponentDescriptorLookupById: + ''' + Used to combine multiple ComponentDescriptorLookupByIds. The single lookups are used in + the order they are specified. If the required component descriptor is found, it is + written back to the prior lookups (if they have a WriteBack defined). + + @param lookups: + a tuple of ComponentDescriptorLookupByIds which should be combined + @param ocm_repository_lookup: + ocm_repository_lookup to be used if none is specified in the lookup function + @param default_absent_ok: + sets the default behaviour in case of absent component descriptors for the returned lookup + function + ''' + async def lookup( + component_id: ocm.ComponentIdentity, + /, + ctx_repo: ocm.OciOcmRepository|str=None, + ocm_repository_lookup=ocm_repository_lookup, + absent_ok=default_absent_ok, + ): + component_id = cnudie.util.to_component_id(component_id) + writebacks = [] + for lookup in lookups: + res = None + try: + if ctx_repo: + res = await lookup( + component_id, + ctx_repo=ctx_repo, + ) + else: + res = await lookup(component_id) + except om.OciImageNotFoundException: + pass + except dacite.DaciteError as ce: + ce.add_note(f'{component_id=}') + raise ce + except requests.exceptions.HTTPError as he: + if he.response.status_code != 500: + raise + logger.warning(f'caught error {he} in {lookup=}, will try next lookup if any') + + if isinstance(res, ocm.ComponentDescriptor): + for wb in writebacks: wb(component_id, res) + return res + elif res is None: continue + elif isinstance(res, WriteBack): writebacks.append(res) + + # component descriptor not found in lookup + if absent_ok: + return + + if isinstance(ctx_repo, str): + ctx_repo = ocm.OciOcmRepository( + type=ocm.OciAccess, + baseUrl=ctx_repo, + ) + + if ctx_repo: + error = ctx_repo.component_version_oci_ref(component_id) + elif ocm_repository_lookup: + def to_repo_url(ocm_repo): + if isinstance(ocm_repo, str): + return ocm_repo + else: + return ocm_repo.oci_ref + + ocm_repository_urls = '\n'.join( + to_repo_url(ocm_repository) for ocm_repository + in ocm_repository_lookup(component_id) + ) + error = f'Did not find {component_id=} in any of the following\n' + error += f'ocm-repositories:\n{ocm_repository_urls}:\n{str(component_id)}' + else: + error = f': {str(component_id)}' + + raise om.OciImageNotFoundException( + error, + ) + + return lookup + + def create_default_component_descriptor_lookup( ocm_repository_lookup: OcmRepositoryLookup=None, cache_dir: str | None=None, @@ -780,6 +1337,86 @@ def create_default_component_descriptor_lookup( ) +def create_default_component_descriptor_lookup_async( + ocm_repository_lookup: OcmRepositoryLookup=None, + cache_dir: str | None=None, + oci_client: oc.AsyncClient | collections.abc.Callable[[], oc.AsyncClient]=None, + delivery_client=None, + default_absent_ok: bool=False, + fallback_to_service_mapping: bool=True, +) -> ComponentDescriptorLookupById: + ''' + This is a convenience function combining commonly used/recommended lookups, using global + configuration if available. It combines (in this order) an in-memory cache, file-system cache, + delivery-service based, and oci-registry based lookup. + + @param ocm_repository_lookup: + lookup for OCM repositories + @param cache_dir: + directory used for caching. If cache_dir is not specified, the filesystem cache lookup is + not included in the returned lookup + @param oci_client: + client to establish the connection to the oci-registry. If the client cannot be created, a + ValueError is raised + @param delivery_client: + client to establish the connection to the delivery-service. If the client cannot be created, + the delivery-service based lookup is not included in the returned lookup + @param default_absent_ok: + sets the default behaviour in case of absent component descriptors for the returned lookup + function + @param fallback_to_service_mapping: + if set, it is tried to retrieve the requested component descriptor using the OCM repository + mapping of thedelivery-service, in case it could not be retrieved using + `ocm_repository_lookup` + ''' + if not ocm_repository_lookup: + import ctx + ocm_repository_lookup = ctx.cfg.ctx.ocm_repository_lookup + + lookups = [ + in_memory_cache_component_descriptor_lookup_async( + ocm_repository_lookup=ocm_repository_lookup, + ) + ] + if not cache_dir: + import ctx + if ctx.cfg: + cache_dir = ctx.cfg.ctx.cache_dir + + if cache_dir: + lookups.append( + file_system_cache_component_descriptor_lookup_async( + cache_dir=cache_dir, + ocm_repository_lookup=ocm_repository_lookup, + ) + ) + + if not delivery_client: + import ccc.delivery + delivery_client = ccc.delivery.default_client_if_available() + if delivery_client: + lookups.append( + delivery_service_component_descriptor_lookup_async( + delivery_client=delivery_client, + ocm_repository_lookup=ocm_repository_lookup, + fallback_to_service_mapping=fallback_to_service_mapping, + ) + ) + + lookups.append( + oci_component_descriptor_lookup_async( + oci_client=oci_client, + ocm_repository_lookup=ocm_repository_lookup, + ), + ) + + return composite_component_descriptor_lookup_async( + lookups=tuple(lookups), + ocm_repository_lookup=ocm_repository_lookup, + default_absent_ok=default_absent_ok, + ) + + def component_diff( left_component: ocm.Component | ocm.ComponentDescriptor, right_component: ocm.Component | ocm.ComponentDescriptor, @@ -816,6 +1453,43 @@ def component_diff( ) +async def component_diff_async( + left_component: ocm.Component | ocm.ComponentDescriptor, + right_component: ocm.Component | ocm.ComponentDescriptor, + ignore_component_names=(), + component_descriptor_lookup: ComponentDescriptorLookupById=None, +) -> cnudie.util.ComponentDiff: + import cnudie.iter as ci # late import to avoid cyclic dependencies + + left_component = cnudie.util.to_component(left_component) + right_component = cnudie.util.to_component(right_component) + + if not component_descriptor_lookup: + component_descriptor_lookup = create_default_component_descriptor_lookup_async() + + left_components = [ + component_node.component async for component_node in ci.iter_async( + component=left_component, + lookup=component_descriptor_lookup, + node_filter=ci.Filter.components, + ) if component_node.component.name not in ignore_component_names + ] + + right_components = [ + component_node.component async for component_node in ci.iter_async( + component=right_component, + lookup=component_descriptor_lookup, + node_filter=ci.Filter.components, + ) if component_node.component.name not in ignore_component_names + ] + + return cnudie.util.diff_components( + left_components=left_components, + right_components=right_components, + ignore_component_names=ignore_component_names, + ) + + def component_versions( component_name: str, ctx_repo: ocm.OcmRepository, @@ -832,3 +1506,21 @@ def component_versions( oci_ref = ctx_repo.component_oci_ref(component_name) return oci_client.tags(image_reference=oci_ref) + + +async def component_versions_async( + component_name: str, + ctx_repo: ocm.OcmRepository, + oci_client: oc.AsyncClient=None, +) -> collections.abc.Sequence[str]: + if not isinstance(ctx_repo, ocm.OciOcmRepository): + raise NotImplementedError(ctx_repo) + + if not oci_client: + import ccc.oci + oci_client = ccc.oci.oci_client_async() + + ctx_repo: ocm.OciOcmRepository + oci_ref = ctx_repo.component_oci_ref(component_name) + + return await oci_client.tags(image_reference=oci_ref) diff --git a/cnudie/upload.py b/cnudie/upload.py index f5ef396d88..8cc7f83610 100644 --- a/cnudie/upload.py +++ b/cnudie/upload.py @@ -212,3 +212,200 @@ def upload_component_descriptor( ) return manifest_bytes + + +async def _oci_blob_ref_from_access_async( + access: ocm.LocalBlobAccess, + oci_client: oci.client.AsyncClient=None, + oci_image_reference: om.OciImageReference=None, +) -> om.OciBlobRef: + ''' + create OciBlobRef from LocalBlobAccess. If access has a globalAccess attribute, all + values will be read from it and returned. + Otherwise, the `size` attribute will be looked up using the (then required) oci_client. A + blob with a digest matching the one from access.localReference (typically of form + sha256:) must be present at the given oci_image_reference. + + This function is useful for creating OCI Image Manifest layer entries from local blob access + objects from Component Descriptors. + ''' + if (global_access := access.globalAccess): + return om.OciBlobRef( + digest=global_access.digest, + mediaType=global_access.mediaType, + size=global_access.size, + ) + if not oci_client or not oci_image_reference: + raise ValueError( + 'oci_client and oci_image_reference must both be set if no globalAccess is present', + access, + ) + + res = await oci_client.head_blob( + image_reference=oci_image_reference, + digest=access.localReference, + absent_ok=True, + ) + + if not res.ok: + if res.status == 404: + raise ValueError( + f'{access.localReference=} not present at {oci_image_reference=}', + access, + ) + res.raise_for_status() + + length = int(res.headers.get('content-length')) + + return om.OciBlobRef( + digest=access.localReference, + mediaType=access.mediaType, + size=length, + ) + + +async def _iter_oci_blob_refs_async( + component: ocm.Component, + oci_client: oci.client.AsyncClient=None, + oci_image_reference: om.OciImageReference=None, +) -> collections.abc.AsyncGenerator[None, None, om.OciBlobRef]: + for artefact in component.iter_artefacts(): + access = artefact.access + if not isinstance(access, ocm.LocalBlobAccess): + continue + + blob_ref = await _oci_blob_ref_from_access_async( + access=access, + oci_client=oci_client, + oci_image_reference=oci_image_reference, + ) + yield blob_ref + + +async def upload_component_descriptor_async( + component_descriptor: ocm.ComponentDescriptor | ocm.Component, + on_exist: UploadMode | str=UploadMode.SKIP, + ocm_repository: ocm.OciOcmRepository | str=None, + oci_client: oci.client.AsyncClient=None, +): + if not oci_client: + try: + import ccc.oci + except ImportError: + raise ValueError('must pass-in oci_client') + oci_client = ccc.oci.oci_client_async() + + on_exist = UploadMode(on_exist) + + if isinstance(component_descriptor, ocm.Component): + component_descriptor = ocm.ComponentDescriptor( + component=component_descriptor, + meta=ocm.Metadata(), + signatures=[], + ) + + component = component_descriptor.component + + schema_version = component_descriptor.meta.schemaVersion + if isinstance(schema_version, enum.Enum): + # hack: convert from gci.componentmodel + schema_version = ocm.SchemaVersion(schema_version.value) + else: + schema_version = ocm.SchemaVersion(schema_version) + + if not schema_version is ocm.SchemaVersion.V2: + raise RuntimeError(f'unsupported component-descriptor-version: {schema_version=}') + + if ocm_repository: + if isinstance(ocm_repository, str): + ocm_repository = ocm.OciOcmRepository(baseUrl=ocm_repository) + elif isinstance(ocm_repository, ocm.OciOcmRepository): + pass + else: + raise TypeError(type(ocm_repository)) + + if not component.current_ocm_repo == ocm_repository: + component.repositoryContexts.append(ocm_repository) + + target_ref = cnudie.util.oci_artefact_reference(component) + + if on_exist in (UploadMode.SKIP, UploadMode.FAIL): + # check whether manifest exists (head_manifest does not return None) + if await oci_client.head_manifest(image_reference=target_ref, absent_ok=True): + if on_exist is UploadMode.SKIP: + return + if on_exist is UploadMode.FAIL: + # XXX: we might still ignore it, if the to-be-uploaded CD is equal to the existing + # one + raise ValueError(f'{target_ref=} already existed') + elif on_exist is UploadMode.OVERWRITE: + pass + else: + raise NotImplementedError(on_exist) + + raw_fobj = gci.oci.component_descriptor_to_tarfileobj(component_descriptor) + cd_digest = hashlib.sha256() + while (chunk := raw_fobj.read(4096)): + cd_digest.update(chunk) + + cd_octets = raw_fobj.tell() + cd_digest = cd_digest.hexdigest() + cd_digest_with_alg = f'sha256:{cd_digest}' + raw_fobj.seek(0) + + await oci_client.put_blob( + image_reference=target_ref, + digest=cd_digest_with_alg, + octets_count=cd_octets, + data=raw_fobj, + ) + + cfg = gci.oci.ComponentDescriptorOciCfg( + componentDescriptorLayer=gci.oci.ComponentDescriptorOciBlobRef( + digest=cd_digest_with_alg, + size=cd_octets, + ) + ) + cfg_raw = json.dumps(dataclasses.asdict(cfg)).encode('utf-8') + cfg_octets = len(cfg_raw) + cfg_digest = hashlib.sha256(cfg_raw).hexdigest() + cfg_digest_with_alg = f'sha256:{cfg_digest}' + + await oci_client.put_blob( + image_reference=target_ref, + digest=cfg_digest_with_alg, + octets_count=cfg_octets, + data=cfg_raw, + ) + + local_blob_layers = { # use set for deduplication + blob_ref async for blob_ref in + _iter_oci_blob_refs_async( + component=component, + oci_client=oci_client, + oci_image_reference=target_ref, + ) + } + + manifest = om.OciImageManifest( + config=gci.oci.ComponentDescriptorOciCfgBlobRef( + digest=cfg_digest_with_alg, + size=cfg_octets, + ), + layers=[ + gci.oci.ComponentDescriptorOciBlobRef( + digest=cd_digest_with_alg, + size=cd_octets, + ), + ] + list(local_blob_layers), + ) + + manifest_dict = manifest.as_dict() + manifest_bytes = json.dumps(manifest_dict).encode('utf-8') + + await oci_client.put_manifest( + image_reference=target_ref, + manifest=manifest_bytes, + ) + + return manifest_bytes diff --git a/cnudie/validate.py b/cnudie/validate.py index 6c49c82f7c..20f67e50ff 100644 --- a/cnudie/validate.py +++ b/cnudie/validate.py @@ -88,3 +88,55 @@ def iter_violations( yield validation_error else: raise ValueError(node) + + +async def validate_resource_node_async( + node: ci.ResourceNode, +) -> ValidationError | None: + resource = node.resource + if resource.type != ocm.ArtefactType.OCI_IMAGE: + return + + access: ocm.OciAccess = resource.access + image_reference = access.imageReference + + try: + image_reference = oci.model.OciImageReference.to_image_ref(image_reference) + if not image_reference.has_tag: + return ValidationError( + node=node, + error=f'Invalid ImageReference (missing tag): {image_reference}', + ) + except ValueError: + # cannot perform checks in image itself using invalid image-ref + return ValidationError( + node=node, + error=f'Invalid ImageReference: {image_reference}', + ) + + oci_client = ccc.oci.oci_client_async() + + if not await oci_client.head_manifest( + image_reference=image_reference, + absent_ok=True, + accept=oci.model.MimeTypes.prefer_multiarch, + ): + return ValidationError( + node=node, + error=f'{image_reference=} does not exist', + ) + + +async def iter_violations_async( + nodes: collections.abc.Iterable[ci.Node], +) -> collections.abc.AsyncGenerator[ValidationError, None, None]: + for node in nodes: + if isinstance(node, ci.ComponentNode): + continue # no validation, yet + elif isinstance(node, ci.SourceNode): + continue # no validation, yet + elif isinstance(node, ci.ResourceNode): + if (validation_error := await validate_resource_node_async(node=node)): + yield validation_error + else: + raise ValueError(node) diff --git a/requirements.txt b/requirements.txt index 7d0f496bdf..86827e676a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ GitPython Mako<2.0.0 Sphinx +aiohttp aliyun-python-sdk-core aliyun-python-sdk-ecs aliyun-python-sdk-ram