diff --git a/CHANGELOG.md b/CHANGELOG.md index 14d5a70a0e..42230b9e85 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.5.14 (2024-04-24) + +### Improvements + +- Implemented real-time entity deletion exclusively for instances that haven't matched any selectors. +- Change the JQ calculation to process only identifier and blueprint for raw entities not selected during real-time events to only get the required data for the delete. + ## 0.5.13 (2024-04-17) ### Features diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 634770d5e1..1a672ea8a0 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -173,27 +173,33 @@ async def search_entities( handle_status_code(response) return [Entity.parse_obj(result) for result in response.json()["entities"]] - async def does_integration_has_ownership_over_entity( - self, entity: Entity, user_agent_type: UserAgentType - ) -> bool: - logger.info(f"Validating ownership on entity {entity.identifier}") - found_entities: list[Entity] = await self.search_entities( + async def search_batch_entities( + self, user_agent_type: UserAgentType, entities_to_search: list[Entity] + ) -> list[Entity]: + search_rules = [] + for entity in entities_to_search: + search_rules.append( + { + "combinator": "and", + "rules": [ + { + "property": "$identifier", + "operator": "=", + "value": entity.identifier, + }, + { + "property": "$blueprint", + "operator": "=", + "value": entity.blueprint, + }, + ], + } + ) + + return await self.search_entities( user_agent_type, { "combinator": "and", - "rules": [ - { - "property": "$identifier", - "operator": "contains", - "value": entity.identifier, - }, - { - "property": "$blueprint", - "operator": "contains", - "value": entity.blueprint, - }, - ], + "rules": [{"combinator": "or", "rules": search_rules}], }, ) - - return len(found_entities) > 0 diff --git a/port_ocean/core/handlers/entity_processor/base.py b/port_ocean/core/handlers/entity_processor/base.py index 368cb9ada9..95164d60e3 100644 --- a/port_ocean/core/handlers/entity_processor/base.py +++ b/port_ocean/core/handlers/entity_processor/base.py @@ -7,7 +7,7 @@ from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.models import Entity from port_ocean.core.ocean_types import ( - RawEntity, + RAW_ITEM, EntitySelectorDiff, ) @@ -38,7 +38,7 @@ class BaseEntityProcessor(BaseHandler): async def _parse_items( self, mapping: ResourceConfig, - raw_data: list[RawEntity], + raw_data: list[RAW_ITEM], parse_all: bool = False, ) -> EntitySelectorDiff: pass @@ -46,7 +46,7 @@ async def _parse_items( async def parse_items( self, mapping: ResourceConfig, - raw_data: list[RawEntity], + raw_data: list[RAW_ITEM], parse_all: bool = False, ) -> EntitySelectorDiff: """Public method to parse raw entity data and map it to an EntityDiff. diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index feee1d0588..a775bd86aa 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -10,7 +10,7 @@ from port_ocean.core.handlers.port_app_config.models import ResourceConfig from port_ocean.core.models import Entity from port_ocean.core.ocean_types import ( - RawEntity, + RAW_ITEM, EntitySelectorDiff, ) from port_ocean.exceptions.core import EntityProcessorException @@ -122,7 +122,7 @@ async def _calculate_entity( async def _parse_items( self, mapping: ResourceConfig, - raw_results: list[RawEntity], + raw_results: list[RAW_ITEM], parse_all: bool = False, ) -> EntitySelectorDiff: raw_entity_mappings: dict[str, Any] = mapping.port.entity.mappings.dict( @@ -153,4 +153,4 @@ async def _parse_items( else: failed_entities.append(parsed_entity) - return {"passed": passed_entities, "failed": failed_entities} + return EntitySelectorDiff(passed=passed_entities, failed=failed_entities) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index f29e6d2780..128114e10f 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -22,9 +22,8 @@ RAW_RESULT, RESYNC_RESULT, RawEntityDiff, - EntityDiff, ASYNC_GENERATOR_RESYNC_TYPE, - RawEntity, + RAW_ITEM, EntitySelectorDiff, ) from port_ocean.core.utils import zip_and_sum @@ -123,7 +122,7 @@ async def _execute_resync_tasks( async def _calculate_raw( self, - raw_diff: list[tuple[ResourceConfig, list[RawEntity]]], + raw_diff: list[tuple[ResourceConfig, list[RAW_ITEM]]], parse_all: bool = False, ) -> list[EntitySelectorDiff]: return await asyncio.gather( @@ -139,40 +138,23 @@ async def _register_resource_raw( results: list[dict[Any, Any]], user_agent_type: UserAgentType, parse_all: bool = False, - ) -> list[Entity]: + ) -> EntitySelectorDiff: objects_diff = await self._calculate_raw([(resource, results)], parse_all) + await self.entities_state_applier.upsert( + objects_diff[0].passed, user_agent_type + ) - entities_after: list[Entity] = objects_diff[0]["passed"] - await self.entities_state_applier.upsert(entities_after, user_agent_type) - - # If an entity didn't pass the JQ selector, we want to delete it if it exists in Port - for entity_to_delete in objects_diff[0]["failed"]: - is_owner = ( - await ocean.port_client.does_integration_has_ownership_over_entity( - entity_to_delete, user_agent_type - ) - ) - if not is_owner: - logger.info( - f"Skipping deletion of entity {entity_to_delete.identifier}, " - f"Couldn't find an entity that's related to the current integration." - ) - continue - await self.entities_state_applier.delete( - objects_diff[0]["failed"], user_agent_type - ) - - return entities_after + return objects_diff[0] async def _unregister_resource_raw( self, resource: ResourceConfig, - results: list[RawEntity], + results: list[RAW_ITEM], user_agent_type: UserAgentType, ) -> list[Entity]: objects_diff = await self._calculate_raw([(resource, results)]) - entities_after: list[Entity] = objects_diff[0]["passed"] + entities_after: list[Entity] = objects_diff[0].passed await self.entities_state_applier.delete(entities_after, user_agent_type) logger.info("Finished unregistering change") return entities_after @@ -189,17 +171,21 @@ async def _register_in_batches( else: async_generators.append(result) - entities = await self._register_resource_raw( - resource_config, raw_results, user_agent_type - ) + entities = ( + await self._register_resource_raw( + resource_config, raw_results, user_agent_type + ) + ).passed for generator in async_generators: try: async for items in generator: entities.extend( - await self._register_resource_raw( - resource_config, items, user_agent_type - ) + ( + await self._register_resource_raw( + resource_config, items, user_agent_type + ) + ).passed ) except* OceanAbortException as error: errors.append(error) @@ -233,13 +219,44 @@ async def register_raw( resource for resource in config.resources if resource.kind == kind ] - return await asyncio.gather( + diffs: list[EntitySelectorDiff] = await asyncio.gather( *( self._register_resource_raw(resource, results, user_agent_type, True) for resource in resource_mappings ) ) + registered_entities, entities_to_delete = zip_and_sum( + (entities_diff.passed, entities_diff.failed) for entities_diff in diffs + ) + + registered_entities_attributes = { + (entity.identifier, entity.blueprint) for entity in registered_entities + } + + filtered_entities_to_delete: list[Entity] = ( + await ocean.port_client.search_batch_entities( + user_agent_type, + [ + entity + for entity in entities_to_delete + if (entity.identifier, entity.blueprint) + not in registered_entities_attributes + ], + ) + ) + + if filtered_entities_to_delete: + logger.info( + f"Deleting {len(filtered_entities_to_delete)} entities that didn't pass any of the selectors" + ) + + await self.entities_state_applier.delete( + filtered_entities_to_delete, user_agent_type + ) + + return registered_entities + async def unregister_raw( self, kind: str, @@ -306,16 +323,13 @@ async def update_raw_diff( [(mapping, raw_desired_state["after"]) for mapping in resource_mappings] ) - entities_before_flatten = [ - item - for sublist in [d["passed"] for d in entities_before] - for item in sublist - ] - entities_after_flatten = [ - item - for sublist in [d["passed"] for d in entities_after] - for item in sublist - ] + entities_before_flatten: list[Entity] = sum( + (entities_diff.passed for entities_diff in entities_before), [] + ) + + entities_after_flatten: list[Entity] = sum( + (entities_diff.passed for entities_diff in entities_after), [] + ) await self.entities_state_applier.apply_diff( {"before": entities_before_flatten, "after": entities_after_flatten}, diff --git a/port_ocean/core/ocean_types.py b/port_ocean/core/ocean_types.py index 5862b118d8..67d358f0e6 100644 --- a/port_ocean/core/ocean_types.py +++ b/port_ocean/core/ocean_types.py @@ -1,14 +1,21 @@ -from typing import TypedDict, Any, AsyncIterator, Callable, Awaitable +from typing import TypedDict, Any, AsyncIterator, Callable, Awaitable, NamedTuple from port_ocean.core.models import Entity -RawEntity = dict[Any, Any] +RAW_ITEM = dict[Any, Any] +RAW_RESULT = list[RAW_ITEM] +ASYNC_GENERATOR_RESYNC_TYPE = AsyncIterator[RAW_RESULT] +RESYNC_RESULT = list[RAW_ITEM | ASYNC_GENERATOR_RESYNC_TYPE] + +LISTENER_RESULT = Awaitable[RAW_RESULT] | ASYNC_GENERATOR_RESYNC_TYPE +RESYNC_EVENT_LISTENER = Callable[[str], LISTENER_RESULT] +START_EVENT_LISTENER = Callable[[], Awaitable[None]] class RawEntityDiff(TypedDict): - before: list[RawEntity] - after: list[RawEntity] + before: list[RAW_ITEM] + after: list[RAW_ITEM] class EntityDiff(TypedDict): @@ -16,21 +23,11 @@ class EntityDiff(TypedDict): after: list[Entity] -class EntitySelectorDiff(TypedDict): +class EntitySelectorDiff(NamedTuple): passed: list[Entity] failed: list[Entity] -RAW_ITEM = dict[Any, Any] -RAW_RESULT = list[RAW_ITEM] -ASYNC_GENERATOR_RESYNC_TYPE = AsyncIterator[RAW_RESULT] -RESYNC_RESULT = list[RAW_ITEM | ASYNC_GENERATOR_RESYNC_TYPE] - -LISTENER_RESULT = Awaitable[RAW_RESULT] | ASYNC_GENERATOR_RESYNC_TYPE -RESYNC_EVENT_LISTENER = Callable[[str], LISTENER_RESULT] -START_EVENT_LISTENER = Callable[[], Awaitable[None]] - - class IntegrationEventsCallbacks(TypedDict): start: list[START_EVENT_LISTENER] resync: dict[str | None, list[RESYNC_EVENT_LISTENER]] diff --git a/pyproject.toml b/pyproject.toml index 143980cb3f..f23814824f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.5.13" +version = "0.5.14" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"