Skip to content

Commit

Permalink
Add async functions to cnudie package
Browse files Browse the repository at this point in the history
  • Loading branch information
8R0WNI3 committed Oct 7, 2024
1 parent 272d80c commit e529ddd
Show file tree
Hide file tree
Showing 6 changed files with 1,289 additions and 2 deletions.
166 changes: 166 additions & 0 deletions cnudie/iter.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,3 +249,169 @@ def iter_resources(
component_filter=component_filter,
reftype_filter=reftype_filter,
)


async def iter_async(
component: ocm.Component,
lookup: cnudie.retrieve.ComponentDescriptorLookupById=None,
recursion_depth: int=-1,
prune_unique: bool=True,
node_filter: collections.abc.Callable[[Node], bool]=None,
ocm_repo: ocm.OcmRepository | str=None,
ctx_repo: ocm.OcmRepository | str=None, # deprecated, use `ocm_repo` instead
component_filter: collections.abc.Callable[[ocm.Component], bool]=None,
reftype_filter: collections.abc.Callable[[NodeReferenceType], bool]=None,
) -> collections.abc.AsyncGenerator[Node, None, None]:
'''
returns a generator yielding the transitive closure of nodes accessible from the given component.
See `cnudie.retrieve` for retrieving components/component descriptors.
@param component: root component for iteration
@param lookup: used to lookup referenced components descriptors
(thus abstracting from retrieval method)
optional iff recursion_depth is set to 0
@param recursion_depth: if set to a positive value, limit recursion for resolving component
dependencies; -1 will resolve w/o recursion limit, 0 will not resolve
component dependencies
@param prune_unique: if true, redundant component-versions will only be traversed once
@param node_filter: use to filter emitted nodes (see Filter for predefined filters)
@param ocm_repo: optional OCM Repository to be used to override in the lookup
@param ctx_repo: deprecated, use `ocm_repo` instead
@param component_filter: use to exclude components (and their references) from the iterator;
thereby `True` means the component should be filtered out
@param reftype_filter: use to exclude components (and their references) from the iterator if
they are of a certain reference type; thereby `True` means the component
should be filtered out
'''
if not ocm_repo and ctx_repo:
ocm_repo = ctx_repo

if isinstance(component, ocm.ComponentDescriptor):
component = component.component

seen_component_ids = set()

if not lookup and not recursion_depth == 0:
raise ValueError('lookup is required if recusion is not disabled (recursion_depth==0)')

# need to nest actual iterator to keep global state of seen component-IDs
async def inner_iter_async(
component: ocm.Component,
lookup: cnudie.retrieve.ComponentDescriptorLookupById,
recursion_depth,
path: tuple[NodePathEntry]=(),
reftype: NodeReferenceType=NodeReferenceType.COMPONENT_REFERENCE,
):
if component_filter and component_filter(component):
return

if reftype_filter and reftype_filter(reftype):
return

path = (*path, NodePathEntry(component, reftype))

yield ComponentNode(
path=path,
)

for resource in component.resources:
yield ResourceNode(
path=path,
resource=resource,
)

for source in component.sources:
yield SourceNode(
path=path,
source=source,
)

if recursion_depth == 0:
return # stop resolving referenced components
elif recursion_depth > 0:
recursion_depth -= 1

for cref in component.componentReferences:
cref_id = ocm.ComponentIdentity(
name=cref.componentName,
version=cref.version,
)

if ocm_repo:
referenced_component_descriptor = await lookup(cref_id, ocm_repo)
else:
referenced_component_descriptor = await lookup(cref_id)

async for node in inner_iter_async(
component=referenced_component_descriptor.component,
lookup=lookup,
recursion_depth=recursion_depth,
path=path,
):
yield node

if not (extra_crefs_label := component.find_label(
name=dso.labels.ExtraComponentReferencesLabel.name,
)):
return

extra_crefs_label: dso.labels.ExtraComponentReferencesLabel = dso.labels.deserialise_label(
label=extra_crefs_label,
)

for extra_cref in extra_crefs_label.value:
extra_cref_id = extra_cref.component_reference

if ocm_repo:
referenced_component_descriptor = await lookup(extra_cref_id, ocm_repo)
else:
referenced_component_descriptor = await lookup(extra_cref_id)

async for node in inner_iter_async(
component=referenced_component_descriptor.component,
lookup=lookup,
recursion_depth=recursion_depth,
path=path,
reftype=NodeReferenceType.EXTRA_COMPONENT_REFS_LABEL,
):
yield node

async for node in inner_iter_async(
component=component,
lookup=lookup,
recursion_depth=recursion_depth,
path=(),
):
if node_filter and not node_filter(node):
continue

if prune_unique and isinstance(node, ComponentNode):
if node.component.identity() in seen_component_ids:
continue
else:
seen_component_ids.add(node.component_id)

yield node


def iter_resources_async(
component: ocm.Component,
lookup: cnudie.retrieve.ComponentDescriptorLookupById=None,
recursion_depth: int=-1,
prune_unique: bool=True,
component_filter: collections.abc.Callable[[ocm.Component], bool]=None,
reftype_filter: collections.abc.Callable[[NodeReferenceType], bool]=None,
) -> collections.abc.AsyncGenerator[ResourceNode, None, None]:
'''
curried version of `iter_async` w/ node-filter preset to yield only resource-nodes
'''
return iter_async(
component=component,
lookup=lookup,
recursion_depth=recursion_depth,
prune_unique=prune_unique,
node_filter=Filter.resources,
component_filter=component_filter,
reftype_filter=reftype_filter,
)
179 changes: 179 additions & 0 deletions cnudie/purge.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,31 @@ def iter_componentversions_to_purge(
)


async def iter_componentversions_to_purge_async(
component: ocm.Component | ocm.ComponentDescriptor,
policy: version.VersionRetentionPolicies,
oci_client: oc.AsyncClient=None,
) -> collections.abc.AsyncGenerator[ocm.ComponentIdentity, None, None]:
oci_ref = cnudie.util.oci_ref(component=component)
if isinstance(component, ocm.ComponentDescriptor):
component = component.component

if not oci_client:
oci_client = ccc.oci.oci_client_async()

versions = await oci_client.tags(oci_ref.ref_without_tag)

for v in version.versions_to_purge(
versions=versions,
reference_version=component.version,
policy=policy,
):
yield ocm.ComponentIdentity(
name=component.name,
version=v,
)


def remove_component_descriptor_and_referenced_artefacts(
component: ocm.Component | ocm.ComponentDescriptor,
oci_client: oc.Client=None,
Expand Down Expand Up @@ -190,3 +215,157 @@ def iter_platform_refs():
)

return True


async def remove_component_descriptor_and_referenced_artefacts_async(
component: ocm.Component | ocm.ComponentDescriptor,
oci_client: oc.AsyncClient=None,
lookup: cnudie.retrieve.ComponentDescriptorLookupById=None,
recursive: bool=False,
on_error: str='abort', # todo: implement, e.g. patch-component-descriptor-and-abort
):
if isinstance(component, ocm.ComponentDescriptor):
component = component.component

logger.info(f'will try to purge {component.name}:{component.version} including local resources')

current_component = None
resources_with_removal_errors = []
if not oci_client:
oci_client = ccc.oci.oci_client_async()

async for node in cnudie.iter.iter_async(
component=component,
lookup=lookup,
recursion_depth=-1 if recursive else 0,
):
# cnudie.iter.iter_async will return sequences of:
# - component-node (always exactly one per component)
# - resource-nodes (if any)
# - source-nodes (if any)
if isinstance(node, cnudie.iter.ComponentNode):
if current_component: # skip for first iteration
await _remove_component_descriptor_async(
component=current_component,
oci_client=oci_client,
)
current_component = node.component
continue

if isinstance(node, cnudie.iter.SourceNode):
continue # we ignore source-nodes for now

if isinstance(node, cnudie.iter.ResourceNode):
if not node.resource.relation is ocm.ResourceRelation.LOCAL:
logger.debug(f'skipping non-local {node.resource.name=}')
continue
try:
did_remove = await _remove_resource_async(
node=node,
oci_client=oci_client,
)
if not did_remove:
logger.info(f'do not know how to remove {node.resource=}')
except Exception as e:
logger.warning(f'error while trying to remove {node.resource=} - {e=}')
traceback.print_exc()
resources_with_removal_errors.append(node)
if on_error == 'abort':
logger.fatal('error encountered - aborting comoponent-descriptor-removal')
raise e
else:
raise ValueError(f'unknown value {on_error=}')

# remove final component (last component-component-descriptor would otherwise not be removed,
# as we remove component-descriptors only after (trying to) remove referenced resources.
if current_component:
await _remove_component_descriptor_async(
component=component,
oci_client=oci_client,
)


async def _remove_component_descriptor_async(
component: ocm.Component,
oci_client: oc.AsyncClient,
):
oci_ref = cnudie.util.oci_ref(
component=component,
)

await oci_client.delete_manifest(
image_reference=oci_ref,
purge=True,
)


async def _remove_resource_async(
node: cnudie.iter.ResourceNode,
oci_client: oc.AsyncClient,
) -> bool:
resource = node.resource
if not resource.type in (ocm.ArtefactType.OCI_IMAGE, 'ociImage'):
return False # we only support removal of oci-images for now

if not resource.relation in (ocm.ResourceRelation.LOCAL, 'local'):
return False # external resources can never be removed (as we do not "own" them)

if not isinstance(resource.access, ocm.OciAccess):
return False # similar to above: we only support removal of oci-images in oci-registries

access: ocm.OciAccess = resource.access
image_reference = om.OciImageReference(access.imageReference)

manifest = await oci_client.manifest(
image_reference=image_reference,
absent_ok=True,
accept=om.MimeTypes.prefer_multiarch,
)

if not manifest:
return True # nothing to do if image does not exist

if image_reference.has_symbolical_tag:
purge = True
elif image_reference.has_digest_tag:
purge = False # no need to "purge" if we were passed a digest-tag
else:
raise ValueError(f'cannot remove image w/o tag: {str(image_reference)}')

await oci_client.delete_manifest(
image_reference=image_reference,
purge=purge,
accept=om.MimeTypes.prefer_multiarch,
)

if isinstance(manifest, om.OciImageManifest):
return True

if not isinstance(manifest, om.OciImageManifestList):
raise ValueError(f'did not expect type {manifest=} {type(manifest)} - this is a bug')

# multi-arch-case - try to guess other tags, and purge those
manifest: om.OciImageManifestList

def iter_platform_refs():
repository = image_reference.ref_without_tag
base_tag = image_reference.tag

for submanifest in manifest.manifests:
p = submanifest.platform
yield f'{repository}:{base_tag}-{p.os}-{p.architecture}'

for ref in iter_platform_refs():
if not await oci_client.head_manifest(
image_reference=ref,
absent_ok=True,
):
logger.warning(f'did not find {ref=} - ignoring')
continue

await oci_client.delete_manifest(
image_reference=ref,
purge=True,
)

return True
Loading

0 comments on commit e529ddd

Please sign in to comment.