From c89b55097849c5f21b2019e38609f279aeeb3a58 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Sun, 15 Dec 2024 17:10:29 +0200 Subject: [PATCH] [Core] Add retry for failed upsertes and handle circular dependencies 1. Register callbacks of failed entities. 2. When done with upserts, try topological sort on failed entities. 3. On fail of retry because of topological sort - try unsorted upsert. 4. Update topological's sort tree creation so an entity cannot be it's own dependency. 5. Test upsert with dependencies, with self circular dependency and external entity dependency. --- CHANGELOG.md | 12 ++++ port_ocean/clients/port/mixins/entities.py | 11 ++- port_ocean/context/event.py | 37 ++++++++++ .../entities_state_applier/port/applier.py | 13 +++- .../port/order_by_entities_dependencies.py | 6 +- .../core/integrations/mixins/sync_raw.py | 4 ++ .../integrations/mixins/test_sync_raw.py | 70 +++++++++++++++++++ pyproject.toml | 2 +- 8 files changed, 149 insertions(+), 6 deletions(-) create mode 100644 port_ocean/tests/utils/integrations/mixins/test_sync_raw.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 71ec5e9d00..c8e215ea0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,18 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.15.3 (2024-12-15) + + +### Bug Fixes + +- On fail of retry because of topological sort - try unsorted upsert. +- Register callbacks of failed entities. +- Test upsert with dependencies, with self circular dependency and external entity dependency. +- Update topologicals sort tree creation so an entity cannot be its own dependency. +- When done with upserts, try topological sort on failed entities. + + ## 0.14.7 (2024-12-09) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 80f8178849..8f628fe7e0 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any +from typing import Any, Literal from urllib.parse import quote_plus import httpx @@ -29,7 +29,7 @@ async def upsert_entity( request_options: RequestOptions, user_agent_type: UserAgentType | None = None, should_raise: bool = True, - ) -> Entity | None: + ) -> Entity | None | Literal[False]: validation_only = request_options["validation_only"] async with self.semaphore: logger.debug( @@ -57,6 +57,13 @@ async def upsert_entity( f"entity: {entity.identifier} of " f"blueprint: {entity.blueprint}" ) + result = response.json() + if ( + response.status_code == 404 + and result.get("ok") is False + and result.get("error") == "not_found" + ): + return False handle_status_code(response, should_raise) result = response.json() diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 601995709f..c13f8f5022 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -10,6 +10,8 @@ Callable, Awaitable, Union, + Tuple, + Coroutine, ) from uuid import uuid4 @@ -23,6 +25,10 @@ ResourceContextNotFoundError, ) from port_ocean.utils.misc import get_time +from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( + order_by_entities_dependencies, +) +from port_ocean.core.models import Entity if TYPE_CHECKING: from port_ocean.core.handlers.port_app_config.models import ( @@ -50,6 +56,37 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) + _failed_entity_callback_list: list[ + Tuple[Entity, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]]] + ] = field(default_factory=list) + + def register_failed_upsert_call_arguments( + self, + entity: Entity, + func: Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]], + ) -> None: + self._failed_entity_callback_list.append((entity, func)) + + async def handle_failed(self) -> None: + entity_map: dict[ + str, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]] + ] = { + f"{obj.identifier}-{obj.blueprint}": func + for obj, func in self._failed_entity_callback_list + } + entity_list: list[Entity] = [ + obj for obj, func in self._failed_entity_callback_list + ] + + sorted_and_mapped = order_by_entities_dependencies(entity_list) + for obj in sorted_and_mapped: + func = entity_map.get(f"{obj.identifier}-{obj.blueprint}") + if func is not None: + await func() + + async def handle_failed_no_sort(self) -> None: + for obj, func in self._failed_entity_callback_list: + await func() def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index c1ab47dd75..68454ceb90 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -115,8 +115,7 @@ async def upsert( entities_without_search_identifier.append(entity) ordered_created_entities = reversed( - entities_with_search_identifier - + order_by_entities_dependencies(entities_without_search_identifier) + entities_with_search_identifier + entities_without_search_identifier ) for entity in ordered_created_entities: upsertedEntity = await self.context.port_client.upsert_entity( @@ -127,6 +126,16 @@ async def upsert( ) if upsertedEntity: modified_entities.append(upsertedEntity) + if upsertedEntity is False: + event.register_failed_upsert_call_arguments( + entity, + lambda: self.context.port_client.upsert_entity( + entity, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ), + ) return modified_entities async def delete( diff --git a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py index cb42d07029..5698fc8379 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py +++ b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py @@ -33,7 +33,11 @@ def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: ] for related_entity in related_entities: - nodes[node(entity)].add(node(related_entity)) + if ( + entity.blueprint is not related_entity.blueprint + or entity.identifier is not related_entity.identifier + ): + nodes[node(entity)].add(node(related_entity)) sort_op = TopologicalSorter(nodes) try: diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 87fd2a8faf..a0f86ee5cf 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -455,6 +455,10 @@ async def sync_raw_all( event.on_abort(lambda: task.cancel()) creation_results.append(await task) + try: + await event.handle_failed() + except: + await event.handle_failed_no_sort() except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py b/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py new file mode 100644 index 0000000000..01573a4dad --- /dev/null +++ b/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py @@ -0,0 +1,70 @@ +import pytest +from unittest.mock import MagicMock +from port_ocean.context.event import event, event_context, EventType +from port_ocean.exceptions.core import ( + OceanAbortException, +) +from typing import Any + +def create_entity(identifier:str,buleprint:str, dependencies:dict[str,str]=None): + entity = MagicMock() + entity.identifier = identifier + entity.blueprint = buleprint + entity.relations = dependencies or {} + return entity + +async def mock_activate(processed_order:list[str],entity:MagicMock): + processed_order.append(f"{entity.identifier}-{entity.blueprint}") + return True + +@pytest.mark.asyncio +async def test_handle_failed_with_dependencies(): + processed_order:list[str] = [] + entity_a = create_entity("entity_a", "buleprint_a",) # No dependencies + entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a + entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b + + + async with event_context(EventType.RESYNC, "manual"): + # Register fails with unsorted order + event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c)) + event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) + event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) + + await event.handle_failed() + + assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}" + +@pytest.mark.asyncio +async def test_handle_failed_with_self_dependencies(): + processed_order:list[str] = [] + entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_a"}) # Self dependency + entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a + entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b + + + async with event_context(EventType.RESYNC, "manual"): + # Register fails with unsorted order + event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c)) + event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) + event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) + + await event.handle_failed() + + assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}" + +@pytest.mark.asyncio +async def test_handle_failed_with_circular_dependencies(): + processed_order:list[str] = [] + entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_b"}) # Self dependency + entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a + + try: + async with event_context(EventType.RESYNC, "manual"): + # Register fails with unsorted order + event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) + event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) + await event.handle_failed() + except OceanAbortException as e: + assert isinstance(e,OceanAbortException) + assert e.args[0] == "Cannot order entities due to cyclic dependencies. \nIf you do want to have cyclic dependencies, please make sure to set the keys 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." diff --git a/pyproject.toml b/pyproject.toml index bb799d82da..2a57a18ccc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.14.7" +version = "0.15.3" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"