From b5690b8f19c573b2a5b51ac7c03a53755a26f620 Mon Sep 17 00:00:00 2001 From: Ivan <62664893+ivankalinovski@users.noreply.github.com> Date: Tue, 24 Dec 2024 16:51:21 +0200 Subject: [PATCH] [Core] Add retry for failed upserts and handle circular dependencies (#1241) # Description What - 1. Add `EntityTopologicalSorter` class to register entities and perform topological sort. 2. Register entities that failed upsert because of unmet dependency. 3. Sort entities topologically and try upserting them. 3. If topological sort failed for cyclical dependency - try unsorted upsert. 4. Update topological's sort tree creation so an entity cannot be it's own dependency. 5. Add tests for - upsert with dependencies, with self circular dependency and circular dependency. Why - Users having circular dependency errors and entities are not upserted. ## Type of change Please leave one option from the following and delete the rest: - [x] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [ ] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

### Core testing checklist - [x] Integration able to create all default resources from scratch - [x] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --------- Co-authored-by: Ivan Kalinovski Co-authored-by: Ivan Kalinovski --- CHANGELOG.md | 15 + .../sonarqube/examples/project.entity.json | 22 +- port_ocean/clients/port/mixins/entities.py | 37 +- port_ocean/context/event.py | 11 + port_ocean/core/defaults/initialize.py | 2 +- .../entities_state_applier/port/applier.py | 26 +- .../port/order_by_entities_dependencies.py | 7 +- .../entity_processor/jq_entity_processor.py | 5 +- .../core/integrations/mixins/sync_raw.py | 21 +- port_ocean/core/integrations/mixins/utils.py | 2 +- port_ocean/core/models.py | 4 + .../core/utils/entity_topological_sorter.py | 90 ++++ port_ocean/core/{ => utils}/utils.py | 0 port_ocean/run.py | 2 +- .../core/handlers/mixins/test_sync_raw.py | 400 ++++++++++++++++++ port_ocean/tests/core/test_utils.py | 2 +- .../utils/test_entity_topological_sorter.py | 99 +++++ pyproject.toml | 2 +- 18 files changed, 705 insertions(+), 42 deletions(-) create mode 100644 port_ocean/core/utils/entity_topological_sorter.py rename port_ocean/core/{ => utils}/utils.py (100%) create mode 100644 port_ocean/tests/core/handlers/mixins/test_sync_raw.py create mode 100644 port_ocean/tests/core/utils/test_entity_topological_sorter.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3d745fb814..0bd446d4f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.16.0 (2024-12-24) + + +### Improvements + +- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all + the failed entities through a topological sort and upsert them as well +- Test upsert with dependencies, with self circular dependency and external entity dependency. + +### Bug Fixes + +- When experiencing cyclic error on topological sort try unsorted upsert of the entities +- Fix topologicals sort tree creation so an entity cannot be its own dependency + + ## 0.15.3 (2024-12-22) ### Bug Fixes diff --git a/integrations/sonarqube/examples/project.entity.json b/integrations/sonarqube/examples/project.entity.json index 52ae086867..cc24ab6056 100644 --- a/integrations/sonarqube/examples/project.entity.json +++ b/integrations/sonarqube/examples/project.entity.json @@ -3,17 +3,17 @@ "title": "Port API", "team": [], "properties": { - "organization": "port-labs", - "link": "https://sonarcloud.io/project/overview?id=port-labs_Port_port-api", - "qualityGateStatus": "ERROR", - "numberOfBugs": 20, - "numberOfCodeSmells": 262, - "numberOfVulnerabilities": 0, - "numberOfHotSpots": 3, - "numberOfDuplications": 18, - "coverage": 0, - "mainBranch": "main" + "organization": "port-labs", + "link": "https://sonarcloud.io/project/overview?id=port-labs_Port_port-api", + "qualityGateStatus": "ERROR", + "numberOfBugs": 20, + "numberOfCodeSmells": 262, + "numberOfVulnerabilities": 0, + "numberOfHotSpots": 3, + "numberOfDuplications": 18, + "coverage": 0, + "mainBranch": "main" }, "relations": {}, "icon": "sonarqube" - } +} diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 80f8178849..f33ab737da 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any +from typing import Any, Literal from urllib.parse import quote_plus import httpx @@ -11,7 +11,8 @@ handle_status_code, PORT_HTTP_MAX_CONNECTIONS_LIMIT, ) -from port_ocean.core.models import Entity +from port_ocean.core.models import Entity, PortAPIErrorMessage +from starlette import status class EntityClientMixin: @@ -29,7 +30,27 @@ async def upsert_entity( request_options: RequestOptions, user_agent_type: UserAgentType | None = None, should_raise: bool = True, - ) -> Entity | None: + ) -> Entity | None | Literal[False]: + """ + This function upserts an entity into Port. + + Usage: + ```python + upsertedEntity = await self.context.port_client.upsert_entity( + entity, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ) + ``` + :param entity: An Entity to be upserted + :param request_options: A dictionary specifying how to upsert the entity + :param user_agent_type: a UserAgentType specifying who is preforming the action + :param should_raise: A boolean specifying whether the error should be raised or handled silently + :return: [Entity] if the upsert occured successfully + :return: [None] will be returned if entity is using search identifier + :return: [False] will be returned if upsert failed because of unmet dependency + """ validation_only = request_options["validation_only"] async with self.semaphore: logger.debug( @@ -50,13 +71,21 @@ async def upsert_entity( }, extensions={"retryable": True}, ) - if response.is_error: logger.error( f"Error {'Validating' if validation_only else 'Upserting'} " f"entity: {entity.identifier} of " f"blueprint: {entity.blueprint}" ) + result = response.json() + + if ( + response.status_code == status.HTTP_404_NOT_FOUND + and not result.get("ok") + and result.get("error") == PortAPIErrorMessage.NOT_FOUND.value + ): + # Return false to differentiate from `result_entity.is_using_search_identifier` + return False handle_status_code(response, should_raise) result = response.json() diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 601995709f..5f8b2b37d5 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,6 +14,7 @@ from uuid import uuid4 from loguru import logger +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy @@ -24,6 +25,7 @@ ) from port_ocean.utils.misc import get_time + if TYPE_CHECKING: from port_ocean.core.handlers.port_app_config.models import ( ResourceConfig, @@ -50,6 +52,9 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) + entity_topological_sorter: EntityTopologicalSorter = field( + default_factory=EntityTopologicalSorter + ) def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) @@ -129,6 +134,11 @@ async def event_context( ) -> AsyncIterator[EventContext]: parent = parent_override or _event_context_stack.top parent_attributes = parent.attributes if parent else {} + entity_topological_sorter = ( + parent.entity_topological_sorter + if parent and parent.entity_topological_sorter + else EntityTopologicalSorter() + ) attributes = {**parent_attributes, **(attributes or {})} new_event = EventContext( @@ -138,6 +148,7 @@ async def event_context( _parent_event=parent, # inherit port app config from parent event, so it can be used in nested events _port_app_config=parent.port_app_config if parent else None, + entity_topological_sorter=entity_topological_sorter, ) _event_context_stack.push(new_event) diff --git a/port_ocean/core/defaults/initialize.py b/port_ocean/core/defaults/initialize.py index c6d8d88ed1..1ae1a49cee 100644 --- a/port_ocean/core/defaults/initialize.py +++ b/port_ocean/core/defaults/initialize.py @@ -14,7 +14,7 @@ ) from port_ocean.core.handlers.port_app_config.models import PortAppConfig from port_ocean.core.models import Blueprint -from port_ocean.core.utils import gather_and_split_errors_from_results +from port_ocean.core.utils.utils import gather_and_split_errors_from_results from port_ocean.exceptions.port_defaults import ( AbortDefaultCreationError, ) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index c1ab47dd75..7d24f17597 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -8,12 +8,11 @@ from port_ocean.core.handlers.entities_state_applier.port.get_related_entities import ( get_related_entities, ) -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) + from port_ocean.core.models import Entity from port_ocean.core.ocean_types import EntityDiff -from port_ocean.core.utils import is_same_entity, get_port_diff +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter +from port_ocean.core.utils.utils import is_same_entity, get_port_diff class HttpEntitiesStateApplier(BaseEntitiesStateApplier): @@ -106,19 +105,7 @@ async def upsert( should_raise=False, ) else: - entities_with_search_identifier: list[Entity] = [] - entities_without_search_identifier: list[Entity] = [] for entity in entities: - if entity.is_using_search_identifier: - entities_with_search_identifier.append(entity) - else: - entities_without_search_identifier.append(entity) - - ordered_created_entities = reversed( - entities_with_search_identifier - + order_by_entities_dependencies(entities_without_search_identifier) - ) - for entity in ordered_created_entities: upsertedEntity = await self.context.port_client.upsert_entity( entity, event.port_app_config.get_port_request_options(), @@ -127,6 +114,9 @@ async def upsert( ) if upsertedEntity: modified_entities.append(upsertedEntity) + # condition to false to differentiate from `result_entity.is_using_search_identifier` + if upsertedEntity is False: + event.entity_topological_sorter.register_entity(entity) return modified_entities async def delete( @@ -141,7 +131,9 @@ async def delete( should_raise=False, ) else: - ordered_deleted_entities = order_by_entities_dependencies(entities) + ordered_deleted_entities = ( + EntityTopologicalSorter.order_by_entities_dependencies(entities) + ) for entity in ordered_deleted_entities: await self.context.port_client.delete_entity( diff --git a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py index cb42d07029..251c747887 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py +++ b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py @@ -14,7 +14,6 @@ def node(entity: Entity) -> Node: def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: nodes: dict[Node, Set[Node]] = {} entities_map = {} - for entity in entities: nodes[node(entity)] = set() entities_map[node(entity)] = entity @@ -33,7 +32,11 @@ def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: ] for related_entity in related_entities: - nodes[node(entity)].add(node(related_entity)) + if ( + entity.blueprint is not related_entity.blueprint + or entity.identifier is not related_entity.identifier + ): + nodes[node(entity)].add(node(related_entity)) sort_op = TopologicalSorter(nodes) try: diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index 17ec9c49b6..3384e32413 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -16,7 +16,10 @@ EntitySelectorDiff, CalculationResult, ) -from port_ocean.core.utils import gather_and_split_errors_from_results, zip_and_sum +from port_ocean.core.utils.utils import ( + gather_and_split_errors_from_results, + zip_and_sum, +) from port_ocean.exceptions.core import EntityProcessorException from port_ocean.utils.queue_utils import process_in_queue diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 87fd2a8faf..ecd94794f2 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -1,4 +1,5 @@ import asyncio +from graphlib import CycleError import inspect import typing from typing import Callable, Awaitable, Any @@ -27,7 +28,7 @@ RAW_ITEM, CalculationResult, ) -from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results +from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results from port_ocean.exceptions.core import OceanAbortException SEND_RAW_DATA_EXAMPLES_AMOUNT = 5 @@ -396,7 +397,20 @@ async def update_raw_diff( {"before": entities_before_flatten, "after": entities_after_flatten}, user_agent_type, ) - + async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: + try: + if not event.entity_topological_sorter.should_execute(): + return None + logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert.",failed_toupsert_entities_count=event.entity_topological_sorter.get_entities_count()) + + for entity in event.entity_topological_sorter.get_entities(): + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + + except OceanAbortException as ocean_abort: + logger.info(f"Failed topological sort of failed to upsert entites - trying to upsert unordered {event.entity_topological_sorter.get_entities_count()} entities.",failed_topological_sort_entities_count=event.entity_topological_sorter.get_entities_count() ) + if isinstance(ocean_abort.__cause__,CycleError): + for entity in event.entity_topological_sorter.get_entities(False): + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) async def sync_raw_all( self, _: dict[Any, Any] | None = None, @@ -426,6 +440,7 @@ async def sync_raw_all( use_cache=False ) logger.info(f"Resync will use the following mappings: {app_config.dict()}") + try: did_fetched_current_state = True entities_at_port = await ocean.port_client.search_entities( @@ -455,6 +470,8 @@ async def sync_raw_all( event.on_abort(lambda: task.cancel()) creation_results.append(await task) + + await self.sort_and_upsert_failed_entities(user_agent_type) except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/core/integrations/mixins/utils.py b/port_ocean/core/integrations/mixins/utils.py index 91a0623956..8819481cae 100644 --- a/port_ocean/core/integrations/mixins/utils.py +++ b/port_ocean/core/integrations/mixins/utils.py @@ -9,7 +9,7 @@ RESYNC_EVENT_LISTENER, RESYNC_RESULT, ) -from port_ocean.core.utils import validate_result +from port_ocean.core.utils.utils import validate_result from port_ocean.exceptions.core import ( RawObjectValidationException, OceanAbortException, diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index c5906f41aa..7a89411a4f 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -27,6 +27,10 @@ def is_installation_type_compatible(self, installation_type: str) -> bool: ) or installation_type == self.value +class PortAPIErrorMessage(Enum): + NOT_FOUND = "not_found" + + class Entity(BaseModel): identifier: Any blueprint: Any diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py new file mode 100644 index 0000000000..7c8d828ea0 --- /dev/null +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -0,0 +1,90 @@ +from typing import Any, Generator +from port_ocean.context import event +from port_ocean.core.models import Entity + +from loguru import logger + +from graphlib import TopologicalSorter, CycleError +from typing import Set + +from port_ocean.exceptions.core import OceanAbortException + +Node = tuple[str, str] + + +class EntityTopologicalSorter: + def __init__(self) -> None: + self.entities: list[Entity] = [] + + def register_entity( + self, + entity: Entity, + ) -> None: + logger.debug( + f"Will retry upserting entity - {entity.identifier} at the end of resync" + ) + self.entities.append(entity) + + def should_execute(self) -> int: + return not event.event.port_app_config.create_missing_related_entities + + def get_entities_count(self) -> int: + return len(self.entities) + + def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]: + if not sorted: + for entity in self.entities: + yield entity + return + + sorted_and_mapped = EntityTopologicalSorter.order_by_entities_dependencies( + self.entities + ) + for entity in sorted_and_mapped: + yield entity + + @staticmethod + def node(entity: Entity) -> Node: + return entity.identifier, entity.blueprint + + @staticmethod + def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: + nodes: dict[Node, Set[Node]] = {} + entities_map = {} + for entity in entities: + nodes[EntityTopologicalSorter.node(entity)] = set() + entities_map[EntityTopologicalSorter.node(entity)] = entity + + for entity in entities: + relation_target_ids: list[str] = sum( + [ + identifiers if isinstance(identifiers, list) else [identifiers] + for identifiers in entity.relations.values() + if identifiers is not None + ], + [], + ) + related_entities = [ + related + for related in entities + if related.identifier in relation_target_ids + ] + + for related_entity in related_entities: + if ( + entity.blueprint is not related_entity.blueprint + or entity.identifier is not related_entity.identifier + ): + nodes[EntityTopologicalSorter.node(entity)].add( + EntityTopologicalSorter.node(related_entity) + ) + + sort_op = TopologicalSorter(nodes) + try: + return [entities_map[item] for item in sort_op.static_order()] + except CycleError as ex: + raise OceanAbortException( + "Cannot order entities due to cyclic dependencies. \n" + "If you do want to have cyclic dependencies, please make sure to set the keys" + " 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." + ) from ex diff --git a/port_ocean/core/utils.py b/port_ocean/core/utils/utils.py similarity index 100% rename from port_ocean/core/utils.py rename to port_ocean/core/utils/utils.py diff --git a/port_ocean/run.py b/port_ocean/run.py index 3ec4262f3f..b90f27f816 100644 --- a/port_ocean/run.py +++ b/port_ocean/run.py @@ -9,7 +9,7 @@ from port_ocean.config.dynamic import default_config_factory from port_ocean.config.settings import ApplicationSettings, LogLevelType from port_ocean.core.defaults.initialize import initialize_defaults -from port_ocean.core.utils import validate_integration_runtime +from port_ocean.core.utils.utils import validate_integration_runtime from port_ocean.log.logger_setup import setup_logger from port_ocean.ocean import Ocean from port_ocean.utils.misc import get_spec_file, load_module diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py new file mode 100644 index 0000000000..7101e18a2d --- /dev/null +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -0,0 +1,400 @@ +from contextlib import asynccontextmanager +from graphlib import CycleError +from typing import Any, AsyncGenerator + +from httpx import Response +from port_ocean.clients.port.client import PortClient +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter +from port_ocean.exceptions.core import OceanAbortException +import pytest +from unittest.mock import MagicMock, AsyncMock, patch +from port_ocean.ocean import Ocean +from port_ocean.context.ocean import PortOceanContext +from port_ocean.core.handlers.port_app_config.models import ( + EntityMapping, + MappingsConfig, + PortAppConfig, + PortResourceConfig, + ResourceConfig, + Selector, +) +from port_ocean.core.integrations.mixins import SyncRawMixin +from port_ocean.core.handlers.entities_state_applier.port.applier import ( + HttpEntitiesStateApplier, +) +from port_ocean.core.handlers.entity_processor.jq_entity_processor import ( + JQEntityProcessor, +) +from port_ocean.core.models import Entity +from port_ocean.context.event import EventContext, event_context, EventType +from port_ocean.clients.port.types import UserAgentType +from port_ocean.context.ocean import ocean + + +@pytest.fixture +def mock_port_client(mock_http_client: MagicMock) -> PortClient: + mock_port_client = PortClient( + MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + mock_port_client.auth = AsyncMock() + mock_port_client.auth.headers = AsyncMock( + return_value={ + "Authorization": "test", + "User-Agent": "test", + } + ) + + mock_port_client.search_entities = AsyncMock(return_value=[]) # type: ignore + mock_port_client.client = mock_http_client + return mock_port_client + + +@pytest.fixture +def mock_http_client() -> MagicMock: + mock_http_client = MagicMock() + mock_upserted_entities = [] + + async def post(url: str, *args: Any, **kwargs: Any) -> Response: + entity = kwargs.get("json", {}) + if entity.get("properties", {}).get("mock_is_to_fail", {}): + return Response( + 404, headers=MagicMock(), json={"ok": False, "error": "not_found"} + ) + + mock_upserted_entities.append( + f"{entity.get('identifier')}-{entity.get('blueprint')}" + ) + return Response( + 200, + json={ + "entity": { + "identifier": entity.get("identifier"), + "blueprint": entity.get("blueprint"), + } + }, + ) + + mock_http_client.post = AsyncMock(side_effect=post) + return mock_http_client + + +@pytest.fixture +def mock_ocean(mock_port_client: PortClient) -> Ocean: + with patch("port_ocean.ocean.Ocean.__init__", return_value=None): + ocean_mock = Ocean( + MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + ocean_mock.config = MagicMock() + ocean_mock.config.port = MagicMock() + ocean_mock.config.port.port_app_config_cache_ttl = 60 + ocean_mock.port_client = mock_port_client + + return ocean_mock + + +@pytest.fixture +def mock_context(mock_ocean: Ocean) -> PortOceanContext: + context = PortOceanContext(mock_ocean) + ocean._app = context.app + return context + + +@pytest.fixture +def mock_port_app_config() -> PortAppConfig: + return PortAppConfig( + enable_merge_entity=True, + delete_dependent_entities=True, + create_missing_related_entities=False, + resources=[ + ResourceConfig( + kind="project", + selector=Selector(query="true"), + port=PortResourceConfig( + entity=MappingsConfig( + mappings=EntityMapping( + identifier=".id | tostring", + title=".name", + blueprint='"service"', + properties={"url": ".web_url"}, + relations={}, + ) + ) + ), + ) + ], + ) + + +@pytest.fixture +def mock_port_app_config_handler(mock_port_app_config: PortAppConfig) -> MagicMock: + handler = MagicMock() + + async def get_config(use_cache: bool = True) -> Any: + return mock_port_app_config + + handler.get_port_app_config = get_config + return handler + + +@pytest.fixture +def mock_entity_processor(mock_context: PortOceanContext) -> JQEntityProcessor: + return JQEntityProcessor(mock_context) + + +@pytest.fixture +def mock_entities_state_applier( + mock_context: PortOceanContext, +) -> HttpEntitiesStateApplier: + return HttpEntitiesStateApplier(mock_context) + + +@pytest.fixture +def mock_sync_raw_mixin( + mock_entity_processor: JQEntityProcessor, + mock_entities_state_applier: HttpEntitiesStateApplier, + mock_port_app_config_handler: MagicMock, +) -> SyncRawMixin: + sync_raw_mixin = SyncRawMixin() + sync_raw_mixin._entity_processor = mock_entity_processor + sync_raw_mixin._entities_state_applier = mock_entities_state_applier + sync_raw_mixin._port_app_config_handler = mock_port_app_config_handler + sync_raw_mixin._get_resource_raw_results = AsyncMock(return_value=([{}], [])) # type: ignore + sync_raw_mixin._entity_processor.parse_items = AsyncMock(return_value=MagicMock()) # type: ignore + + return sync_raw_mixin + + +@asynccontextmanager +async def no_op_event_context( + existing_event: EventContext, +) -> AsyncGenerator[EventContext, None]: + yield existing_event + + +def create_entity( + id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool +) -> Entity: + entity = Entity(identifier=id, blueprint=blueprint) + entity.relations = relation + entity.properties = {"mock_is_to_fail": is_to_fail} + return entity + + +@pytest.mark.asyncio +async def test_sync_raw_mixin_self_dependency( + mock_sync_raw_mixin: SyncRawMixin, +) -> None: + entities_params = [ + ("entity_1", "service", {"service": "entity_1"}, True), + ("entity_2", "service", {"service": "entity_2"}, False), + ] + entities = [create_entity(*entity_param) for entity_param in entities_params] + + calc_result_mock = MagicMock() + calc_result_mock.entity_selector_diff.passed = entities + calc_result_mock.errors = [] + + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + + mock_order_by_entities_dependencies = MagicMock( + side_effect=EntityTopologicalSorter.order_by_entities_dependencies + ) + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + app_config = ( + await mock_sync_raw_mixin.port_app_config_handler.get_port_app_config( + use_cache=False + ) + ) + event.port_app_config = app_config + event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore + + with patch( + "port_ocean.core.integrations.mixins.sync_raw.event_context", + lambda *args, **kwargs: no_op_event_context(event), + ): + with patch( + "port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies", + mock_order_by_entities_dependencies, + ): + + await mock_sync_raw_mixin.sync_raw_all( + trigger_type="machine", user_agent_type=UserAgentType.exporter + ) + + assert ( + len(event.entity_topological_sorter.entities) == 1 + ), "Expected one failed entity callback due to retry logic" + assert event.entity_topological_sorter.register_entity.call_count == 1 + assert event.entity_topological_sorter.get_entities.call_count == 1 + + assert mock_order_by_entities_dependencies.call_count == 1 + assert [ + call[0][0][0] + for call in mock_order_by_entities_dependencies.call_args_list + ] == [entity for entity in entities if entity.identifier == "entity_1"] + + +@pytest.mark.asyncio +async def test_sync_raw_mixin_circular_dependency( + mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean +) -> None: + entities_params = [ + ("entity_1", "service", {"service": "entity_2"}, True), + ("entity_2", "service", {"service": "entity_1"}, True), + ] + entities = [create_entity(*entity_param) for entity_param in entities_params] + + calc_result_mock = MagicMock() + calc_result_mock.entity_selector_diff.passed = entities + calc_result_mock.errors = [] + + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + + mock_order_by_entities_dependencies = MagicMock( + side_effect=EntityTopologicalSorter.order_by_entities_dependencies + ) + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + app_config = ( + await mock_sync_raw_mixin.port_app_config_handler.get_port_app_config( + use_cache=False + ) + ) + event.port_app_config = app_config + org = event.entity_topological_sorter.register_entity + + def mock_register_entity(*args: Any, **kwargs: Any) -> Any: + entity = args[0] + entity.properties["mock_is_to_fail"] = False + return org(*args, **kwargs) + + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + raiesed_error_handle_failed = [] + org_get_entities = event.entity_topological_sorter.get_entities + + def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return list(org_get_entities(*args, **kwargs)) + except Exception as e: + raiesed_error_handle_failed.append(e) + raise e + + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore + + with patch( + "port_ocean.core.integrations.mixins.sync_raw.event_context", + lambda *args, **kwargs: no_op_event_context(event), + ): + with patch( + "port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies", + mock_order_by_entities_dependencies, + ): + + await mock_sync_raw_mixin.sync_raw_all( + trigger_type="machine", user_agent_type=UserAgentType.exporter + ) + + assert ( + len(event.entity_topological_sorter.entities) == 2 + ), "Expected one failed entity callback due to retry logic" + assert event.entity_topological_sorter.register_entity.call_count == 2 + assert event.entity_topological_sorter.get_entities.call_count == 2 + assert [ + call[0] + for call in event.entity_topological_sorter.get_entities.call_args_list + ] == [(), (False,)] + assert len(raiesed_error_handle_failed) == 1 + assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) + assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError) + assert ( + len(mock_ocean.port_client.client.post.call_args_list) # type: ignore + / len(entities) + == 2 + ) + + +@pytest.mark.asyncio +async def test_sync_raw_mixin_dependency( + mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean +) -> None: + entities_params = [ + ("entity_1", "service", {"service": "entity_3"}, True), + ("entity_2", "service", {"service": "entity_4"}, True), + ("entity_3", "service", {"service": ""}, True), + ("entity_4", "service", {"service": "entity_3"}, True), + ("entity_5", "service", {"service": "entity_1"}, True), + ] + entities = [create_entity(*entity_param) for entity_param in entities_params] + + calc_result_mock = MagicMock() + calc_result_mock.entity_selector_diff.passed = entities + calc_result_mock.errors = [] + + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + + mock_order_by_entities_dependencies = MagicMock( + side_effect=EntityTopologicalSorter.order_by_entities_dependencies + ) + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + app_config = ( + await mock_sync_raw_mixin.port_app_config_handler.get_port_app_config( + use_cache=False + ) + ) + event.port_app_config = app_config + org = event.entity_topological_sorter.register_entity + + def mock_register_entity(*args: Any, **kwargs: Any) -> None: + entity = args[0] + entity.properties["mock_is_to_fail"] = False + return org(*args, **kwargs) + + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + raiesed_error_handle_failed = [] + org_event_get_entities = event.entity_topological_sorter.get_entities + + def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return org_event_get_entities(*args, **kwargs) + except Exception as e: + raiesed_error_handle_failed.append(e) + raise e + + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore + + with patch( + "port_ocean.core.integrations.mixins.sync_raw.event_context", + lambda *args, **kwargs: no_op_event_context(event), + ): + with patch( + "port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies", + mock_order_by_entities_dependencies, + ): + + await mock_sync_raw_mixin.sync_raw_all( + trigger_type="machine", user_agent_type=UserAgentType.exporter + ) + + assert event.entity_topological_sorter.register_entity.call_count == 5 + assert ( + len(event.entity_topological_sorter.entities) == 5 + ), "Expected one failed entity callback due to retry logic" + assert event.entity_topological_sorter.get_entities.call_count == 1 + assert len(raiesed_error_handle_failed) == 0 + assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore + assert mock_order_by_entities_dependencies.call_count == 1 + + first = mock_ocean.port_client.client.post.call_args_list[0:5] # type: ignore + second = mock_ocean.port_client.client.post.call_args_list[5:10] # type: ignore + + assert "-".join( + [call[1].get("json").get("identifier") for call in first] + ) == "-".join([entity.identifier for entity in entities]) + assert "-".join( + [call[1].get("json").get("identifier") for call in second] + ) in ( + "entity_3-entity_4-entity_1-entity_2-entity_5", + "entity_3-entity_4-entity_1-entity_5-entity_2", + "entity_3-entity_1-entity_4-entity_2-entity_5", + "entity_3-entity_1-entity_4-entity_5-entity_2", + ) diff --git a/port_ocean/tests/core/test_utils.py b/port_ocean/tests/core/test_utils.py index c291778c0c..021a820671 100644 --- a/port_ocean/tests/core/test_utils.py +++ b/port_ocean/tests/core/test_utils.py @@ -2,7 +2,7 @@ import pytest -from port_ocean.core.utils import validate_integration_runtime +from port_ocean.core.utils.utils import validate_integration_runtime from port_ocean.clients.port.client import PortClient from port_ocean.core.models import Runtime from port_ocean.tests.helpers.port_client import get_port_client_for_integration diff --git a/port_ocean/tests/core/utils/test_entity_topological_sorter.py b/port_ocean/tests/core/utils/test_entity_topological_sorter.py new file mode 100644 index 0000000000..4fa4f5c2f7 --- /dev/null +++ b/port_ocean/tests/core/utils/test_entity_topological_sorter.py @@ -0,0 +1,99 @@ +from port_ocean.core.models import Entity +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter +from unittest.mock import MagicMock +from port_ocean.exceptions.core import ( + OceanAbortException, +) + + +def create_entity( + identifier: str, buleprint: str, dependencies: dict[str, str] = {} +) -> Entity: + entity = MagicMock() + entity.identifier = identifier + entity.blueprint = buleprint + entity.relations = dependencies or {} + return entity + + +def test_handle_failed_with_dependencies() -> None: + # processed_order:list[str] = [] + entity_a = create_entity( + "entity_a", + "buleprint_a", + ) # No dependencies + entity_b = create_entity( + "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Depends on entity_a + entity_c = create_entity( + "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} + ) # Depends on entity_b + + entity_topological_sort = EntityTopologicalSorter() + # Register fails with unsorted order + entity_topological_sort.register_entity(entity_c) + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + + processed_order = [ + f"{entity.identifier}-{entity.blueprint}" + for entity in list(entity_topological_sort.get_entities()) + ] + assert processed_order == [ + "entity_a-buleprint_a", + "entity_b-buleprint_a", + "entity_c-buleprint_b", + ], f"Processed order: {processed_order}" + + +def test_handle_failed_with_self_dependencies() -> None: + entity_a = create_entity( + "entity_a", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Self dependency + entity_b = create_entity( + "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Depends on entity_a + entity_c = create_entity( + "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} + ) # Depends on entity_b + + entity_topological_sort = EntityTopologicalSorter() + + # Register fails with unsorted order + entity_topological_sort.register_entity(entity_c) + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + + processed_order = [ + f"{entity.identifier}-{entity.blueprint}" + for entity in list(entity_topological_sort.get_entities()) + ] + + assert processed_order == [ + "entity_a-buleprint_a", + "entity_b-buleprint_a", + "entity_c-buleprint_b", + ], f"Processed order: {processed_order}" + + +def test_handle_failed_with_circular_dependencies() -> None: + # processed_order:list[str] = [] + entity_a = create_entity( + "entity_a", "buleprint_a", {"dep_name_1": "entity_b"} + ) # Self dependency + entity_b = create_entity( + "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Depends on entity_a + + entity_topological_sort = EntityTopologicalSorter() + try: + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + entity_topological_sort.get_entities() + + except OceanAbortException as e: + assert isinstance(e, OceanAbortException) + assert ( + e.args[0] + == "Cannot order entities due to cyclic dependencies. \nIf you do want to have cyclic dependencies, please make sure to set the keys 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." + ) diff --git a/pyproject.toml b/pyproject.toml index 2a57a18ccc..26005a99c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.15.3" +version = "0.16.0" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"