Skip to content

Commit

Permalink
Make recursive components actually run recursively
Browse files Browse the repository at this point in the history
  • Loading branch information
whyitfor committed Dec 14, 2024
1 parent 35facfa commit 84d10f0
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 206 deletions.
2 changes: 1 addition & 1 deletion ofrak_core/ofrak/gui/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -546,7 +546,7 @@ async def identify_recursively(self, request: Request) -> Response:
await {resource}.identify_recursively()"""
await self.script_builder.add_action(resource, script_str, ActionType.MOD)
try:
result = await resource.auto_run_recursively(all_identifiers=True)
result = await resource.identify_recursively()
await self.script_builder.commit_to_script(resource)
except Exception as e:
await self.script_builder.clear_script_queue(resource)
Expand Down
101 changes: 47 additions & 54 deletions ofrak_core/ofrak/resource.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,7 @@ async def auto_run(
self,
components: Iterable[Type[ComponentInterface]] = tuple(),
blacklisted_components: Iterable[Type[ComponentInterface]] = tuple(),
blacklisted_tags: Iterable[ResourceTag] = tuple(),
all_unpackers: bool = False,
all_identifiers: bool = False,
all_analyzers: bool = False,
Expand All @@ -433,6 +434,7 @@ async def auto_run(
self._resource.id,
components_allowed=tuple(c.get_id() for c in components),
components_disallowed=tuple(c.get_id() for c in blacklisted_components),
tags_ignored=tuple(blacklisted_tags),
all_unpackers=all_unpackers,
all_identifiers=all_identifiers,
all_analyzers=all_analyzers,
Expand All @@ -454,7 +456,19 @@ async def unpack(self) -> ComponentRunResult:
:return: A ComponentRunResult containing information on resources affected by the component
"""
return await self.auto_run(all_identifiers=True, all_unpackers=True)
return await self._unpack()

async def _unpack(
self,
blacklisted_components: Iterable[Type[ComponentInterface]] = tuple(),
do_not_unpack: Iterable[ResourceTag] = tuple(),
) -> ComponentRunResult:
return await self.auto_run(
all_identifiers=True,
all_unpackers=True,
blacklisted_components=blacklisted_components,
blacklisted_tags=do_not_unpack,
)

async def analyze(self, resource_attributes: Type[RA]) -> RA:
"""
Expand All @@ -477,6 +491,15 @@ async def identify(self) -> ComponentRunResult:
"""
return await self.auto_run(all_identifiers=True)

async def identify_recursively(self) -> ComponentRunResult:
component_result = await self.identify()
children_results = await asyncio.gather(
*(child.identify_recursively() for child in await self.get_children())
)
for child_result in children_results:
component_result.update(child_result)
return component_result

async def pack(self) -> ComponentRunResult:
"""
Pack the resource.
Expand All @@ -485,52 +508,6 @@ async def pack(self) -> ComponentRunResult:
"""
return await self.auto_run(all_packers=True)

async def auto_run_recursively(
self,
components: Iterable[Type[ComponentInterface]] = tuple(),
blacklisted_components: Iterable[Type[ComponentInterface]] = tuple(),
blacklisted_tags: Iterable[ResourceTag] = tuple(),
all_unpackers: bool = False,
all_identifiers: bool = False,
all_analyzers: bool = False,
) -> ComponentRunResult:
"""
Automatically run multiple components which may run on this resource or its descendents.
From an initial set of possible components to run, this set is searched for components
for which the intersection of the component's targets and this resource's tags is not
empty. Accepts several optional flags to expand or restrict the initial set of
components.
After each run, compatible components from the initial set are run on any resources which
have had tags added (including newly created resources). This is repeated until no new
tags are added.
:param components: Components to explicitly add to the initial set of components
:param blacklisted_components: Components to explicitly remove to the initial set of
components
:param all_unpackers: If true, all Unpackers are added to the initial set of components
:param all_identifiers: If true, all Identifiers are added to the initial set of components
:param all_analyzers: If true, all Analyzers are added to the initial set of components
:return: A ComponentRunResult containing information on resources affected by the component
"""
components_result = await self._job_service.run_components_recursively(
JobMultiComponentRequest(
self._job_id,
self._resource.id,
components_allowed=tuple(c.get_id() for c in components),
components_disallowed=tuple(c.get_id() for c in blacklisted_components),
all_unpackers=all_unpackers,
all_identifiers=all_identifiers,
all_analyzers=all_analyzers,
tags_ignored=tuple(blacklisted_tags),
)
)
await self._fetch_resources(components_result.resources_modified)
await self._update_views(
components_result.resources_modified, components_result.resources_deleted
)
return components_result

async def unpack_recursively(
self,
blacklisted_components: Iterable[Type[ComponentInterface]] = tuple(),
Expand All @@ -551,21 +528,37 @@ async def unpack_recursively(
:return: A ComponentRunResult containing information on resources affected by the component
"""
return await self.auto_run_recursively(
all_identifiers=True,
all_unpackers=True,
blacklisted_components=blacklisted_components,
blacklisted_tags=do_not_unpack,
component_result = await self._unpack(blacklisted_components, do_not_unpack)
component_results = await asyncio.gather(
*(
child.unpack_recursively(blacklisted_components, do_not_unpack)
for child in await self.get_children()
)
)
for child_result in component_results:
component_result.update(child_result)
return component_result

async def analyze_recursively(self) -> ComponentRunResult:
return await self.auto_run_recursively(all_analyzers=True)
component_result = await self.auto_run(all_analyzers=True)
children_results = await asyncio.gather(
*(child.analyze_recursively() for child in await self.get_children())
)
for child_result in children_results:
component_result.update(child_result)
return component_result

async def pack_recursively(self) -> ComponentRunResult:
"""
Recursively pack the resource, starting with its descendants.
"""
return await self._job_service.pack_recursively(self._job_id, self._resource.id)
children_results = await asyncio.gather(
*(child.pack_recursively() for child in await self.get_children())
)
component_result = await self.pack()
for child_result in children_results:
component_result.update(child_result)
return component_result

async def write_to(self, destination: BinaryIO, pack: bool = True):
"""
Expand Down
110 changes: 0 additions & 110 deletions ofrak_core/ofrak/service/job_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -292,116 +292,6 @@ async def run_components(

return components_result

async def run_components_recursively(
self, request: JobMultiComponentRequest
) -> ComponentRunResult:
components_result = ComponentRunResult()
component_filter = _build_auto_run_filter(request)

initial_target_resource_models = await self._get_initial_recursive_target_resources(
request.resource_id, component_filter
)

# Create a mock context to match all existing tags
previous_job_context: JobRunContext = self._job_context_factory.create()
for existing_resource_model in initial_target_resource_models:
previous_job_context.trackers[existing_resource_model.id].tags_added.update(
existing_resource_model.tags
)
iterations = 0
tags_added_count = 1 # initialize just so loop starts

while tags_added_count > 0:
job_context = self._job_context_factory.create()
_run_components_requests = []
for resource_id, previous_tracker in previous_job_context.trackers.items():
final_filter = ComponentAndMetaFilter(
component_filter,
_build_tag_filter(tuple(previous_tracker.tags_added)),
)
_run_components_requests.append(
_ComponentAutoRunRequest(
resource_id,
final_filter,
)
)

iteration_components_result = await self._auto_run_components(
_run_components_requests,
request.job_id,
job_context,
)
components_result.update(iteration_components_result)

tags_added_count = 0
for resource_id, tracker in job_context.trackers.items():
if len(tracker.tags_added) > 0:
tags_added_count += len(tracker.tags_added)
previous_job_context = job_context
LOGGER.info(
f"Completed iteration {iterations} of run_components_recursively on "
f"{request.resource_id.hex()}. {len(components_result.resources_modified)} "
f"resources modified and {tags_added_count} tags added."
)
iterations += 1
return components_result

async def pack_recursively(
self,
job_id: bytes,
resource_id: bytes,
) -> ComponentRunResult:
packer_filter = PACKERS_FILTER
target_cache = self._build_target_cache(packer_filter)
all_components_result = ComponentRunResult()
if len(target_cache) == 0:
return all_components_result
resources = await self._resource_service.get_descendants_by_id(
resource_id,
r_filter=ResourceFilter(
include_self=True,
tags=tuple(target_cache.keys()),
tags_condition=ResourceFilterCondition.OR,
),
)
resources = list(resources) # we'll need that Iterable more than once
job_context = self._job_context_factory.create()

# We want to start with the deepest packers. Packers at the same levels can run
# concurrently. So we first ask for the relative depth of each returned resource.
resource_depths = await self._resource_service.get_depths(
[resource.id for resource in resources]
)

resources_by_depth = defaultdict(list)
for resource, depth in zip(resources, resource_depths):
resources_by_depth[depth].append(resource)

for depth in sorted(resources_by_depth.keys(), reverse=True):
for resource in resources_by_depth[depth]:
component_filter: ComponentFilter = ComponentAndMetaFilter(
PACKERS_FILTER,
_build_tag_filter(tuple(resource.get_tags())),
)

request = _ComponentAutoRunRequest(
resource.id,
component_filter,
)

component_result = await self._auto_run_components(
[request],
job_id,
job_context,
)
n_packers_run = len(component_result.components_run)
if n_packers_run == 0:
all_components_result.update(component_result)
break
if n_packers_run > 1:
raise ValueError(f"Multiple packers are targeting resource {resource.id.hex()}")
return all_components_result

async def _get_initial_recursive_target_resources(
self, resource_id: bytes, component_filter: ComponentFilter
):
Expand Down
41 changes: 0 additions & 41 deletions ofrak_core/ofrak/service/job_service_i.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,47 +83,6 @@ async def run_components(
:raises NoMatchingComponentException: if no components match the filters for the resource.
"""

@abstractmethod
async def run_components_recursively(
self,
request: JobMultiComponentRequest,
) -> ComponentRunResult:
"""
Start from a resource and run components on it and then on any resources which have tags
added as a result of that initial run, then run components on any resources with new tags
from those subsequent runs, until an iteration of component runs results in no new tags
being added. The component(s) run on each resource are chosen according to the provided
filters and which tags were added to that resource in the previous iteration. That is,
the filters are applied to the set of resource which target those new tags.
:param request: Data structure containing the ID of the job to run the components in,
the ID of the resource to start running recursively from, and filters for the components to
run.
:return: A data structure describing the components run and resources
modified/created/deleted.
:raises ComponentAutoRunFailure: if one of the automatically chosen components raises an
error while running.
"""

@abstractmethod
async def pack_recursively(
self,
job_id: bytes,
resource_id: bytes,
) -> ComponentRunResult:
"""
Call Packer components on the deepest descendants of a resource (the root of this search),
then Packers on the next level up, etc. until the search root resource.
:param job_id: Job to run the component in.
:param resource_id: ID of the search root resource.
:return: A data structure describing the components run and resources
modified/created/deleted.
"""


class ComponentAutoRunFailure(Exception):
def __init__(
Expand Down

0 comments on commit 84d10f0

Please sign in to comment.