From c89b55097849c5f21b2019e38609f279aeeb3a58 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Sun, 15 Dec 2024 17:10:29 +0200 Subject: [PATCH 01/11] [Core] Add retry for failed upsertes and handle circular dependencies 1. Register callbacks of failed entities. 2. When done with upserts, try topological sort on failed entities. 3. On fail of retry because of topological sort - try unsorted upsert. 4. Update topological's sort tree creation so an entity cannot be it's own dependency. 5. Test upsert with dependencies, with self circular dependency and external entity dependency. --- CHANGELOG.md | 12 ++++ port_ocean/clients/port/mixins/entities.py | 11 ++- port_ocean/context/event.py | 37 ++++++++++ .../entities_state_applier/port/applier.py | 13 +++- .../port/order_by_entities_dependencies.py | 6 +- .../core/integrations/mixins/sync_raw.py | 4 ++ .../integrations/mixins/test_sync_raw.py | 70 +++++++++++++++++++ pyproject.toml | 2 +- 8 files changed, 149 insertions(+), 6 deletions(-) create mode 100644 port_ocean/tests/utils/integrations/mixins/test_sync_raw.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 71ec5e9d00..c8e215ea0e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,18 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.15.3 (2024-12-15) + + +### Bug Fixes + +- On fail of retry because of topological sort - try unsorted upsert. +- Register callbacks of failed entities. +- Test upsert with dependencies, with self circular dependency and external entity dependency. +- Update topologicals sort tree creation so an entity cannot be its own dependency. +- When done with upserts, try topological sort on failed entities. + + ## 0.14.7 (2024-12-09) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 80f8178849..8f628fe7e0 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -1,5 +1,5 @@ import asyncio -from typing import Any +from typing import Any, Literal from urllib.parse import quote_plus import httpx @@ -29,7 +29,7 @@ async def upsert_entity( request_options: RequestOptions, user_agent_type: UserAgentType | None = None, should_raise: bool = True, - ) -> Entity | None: + ) -> Entity | None | Literal[False]: validation_only = request_options["validation_only"] async with self.semaphore: logger.debug( @@ -57,6 +57,13 @@ async def upsert_entity( f"entity: {entity.identifier} of " f"blueprint: {entity.blueprint}" ) + result = response.json() + if ( + response.status_code == 404 + and result.get("ok") is False + and result.get("error") == "not_found" + ): + return False handle_status_code(response, should_raise) result = response.json() diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 601995709f..c13f8f5022 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -10,6 +10,8 @@ Callable, Awaitable, Union, + Tuple, + Coroutine, ) from uuid import uuid4 @@ -23,6 +25,10 @@ ResourceContextNotFoundError, ) from port_ocean.utils.misc import get_time +from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( + order_by_entities_dependencies, +) +from port_ocean.core.models import Entity if TYPE_CHECKING: from port_ocean.core.handlers.port_app_config.models import ( @@ -50,6 +56,37 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) + _failed_entity_callback_list: list[ + Tuple[Entity, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]]] + ] = field(default_factory=list) + + def register_failed_upsert_call_arguments( + self, + entity: Entity, + func: Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]], + ) -> None: + self._failed_entity_callback_list.append((entity, func)) + + async def handle_failed(self) -> None: + entity_map: dict[ + str, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]] + ] = { + f"{obj.identifier}-{obj.blueprint}": func + for obj, func in self._failed_entity_callback_list + } + entity_list: list[Entity] = [ + obj for obj, func in self._failed_entity_callback_list + ] + + sorted_and_mapped = order_by_entities_dependencies(entity_list) + for obj in sorted_and_mapped: + func = entity_map.get(f"{obj.identifier}-{obj.blueprint}") + if func is not None: + await func() + + async def handle_failed_no_sort(self) -> None: + for obj, func in self._failed_entity_callback_list: + await func() def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index c1ab47dd75..68454ceb90 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -115,8 +115,7 @@ async def upsert( entities_without_search_identifier.append(entity) ordered_created_entities = reversed( - entities_with_search_identifier - + order_by_entities_dependencies(entities_without_search_identifier) + entities_with_search_identifier + entities_without_search_identifier ) for entity in ordered_created_entities: upsertedEntity = await self.context.port_client.upsert_entity( @@ -127,6 +126,16 @@ async def upsert( ) if upsertedEntity: modified_entities.append(upsertedEntity) + if upsertedEntity is False: + event.register_failed_upsert_call_arguments( + entity, + lambda: self.context.port_client.upsert_entity( + entity, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ), + ) return modified_entities async def delete( diff --git a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py index cb42d07029..5698fc8379 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py +++ b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py @@ -33,7 +33,11 @@ def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: ] for related_entity in related_entities: - nodes[node(entity)].add(node(related_entity)) + if ( + entity.blueprint is not related_entity.blueprint + or entity.identifier is not related_entity.identifier + ): + nodes[node(entity)].add(node(related_entity)) sort_op = TopologicalSorter(nodes) try: diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 87fd2a8faf..a0f86ee5cf 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -455,6 +455,10 @@ async def sync_raw_all( event.on_abort(lambda: task.cancel()) creation_results.append(await task) + try: + await event.handle_failed() + except: + await event.handle_failed_no_sort() except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py b/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py new file mode 100644 index 0000000000..01573a4dad --- /dev/null +++ b/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py @@ -0,0 +1,70 @@ +import pytest +from unittest.mock import MagicMock +from port_ocean.context.event import event, event_context, EventType +from port_ocean.exceptions.core import ( + OceanAbortException, +) +from typing import Any + +def create_entity(identifier:str,buleprint:str, dependencies:dict[str,str]=None): + entity = MagicMock() + entity.identifier = identifier + entity.blueprint = buleprint + entity.relations = dependencies or {} + return entity + +async def mock_activate(processed_order:list[str],entity:MagicMock): + processed_order.append(f"{entity.identifier}-{entity.blueprint}") + return True + +@pytest.mark.asyncio +async def test_handle_failed_with_dependencies(): + processed_order:list[str] = [] + entity_a = create_entity("entity_a", "buleprint_a",) # No dependencies + entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a + entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b + + + async with event_context(EventType.RESYNC, "manual"): + # Register fails with unsorted order + event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c)) + event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) + event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) + + await event.handle_failed() + + assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}" + +@pytest.mark.asyncio +async def test_handle_failed_with_self_dependencies(): + processed_order:list[str] = [] + entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_a"}) # Self dependency + entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a + entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b + + + async with event_context(EventType.RESYNC, "manual"): + # Register fails with unsorted order + event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c)) + event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) + event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) + + await event.handle_failed() + + assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}" + +@pytest.mark.asyncio +async def test_handle_failed_with_circular_dependencies(): + processed_order:list[str] = [] + entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_b"}) # Self dependency + entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a + + try: + async with event_context(EventType.RESYNC, "manual"): + # Register fails with unsorted order + event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) + event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) + await event.handle_failed() + except OceanAbortException as e: + assert isinstance(e,OceanAbortException) + assert e.args[0] == "Cannot order entities due to cyclic dependencies. \nIf you do want to have cyclic dependencies, please make sure to set the keys 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." diff --git a/pyproject.toml b/pyproject.toml index bb799d82da..2a57a18ccc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.14.7" +version = "0.15.3" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io" From b7b88ffcf519d3a26b013cd6d0606ce1750f81ec Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Thu, 19 Dec 2024 16:28:47 +0200 Subject: [PATCH 02/11] [Core] Add retry for failed upsertes and handle circular dependencies 1. Add robust tests. 2. Change the implementation of registering failed entities. --- CHANGELOG.md | 8 +- port_ocean/clients/port/mixins/entities.py | 2 +- port_ocean/context/event.py | 41 +- .../entities_state_applier/port/applier.py | 11 +- .../port/order_by_entities_dependencies.py | 1 - .../core/integrations/mixins/sync_raw.py | 7 +- .../core/handlers/mixins/test_sync_raw_v2.py | 425 ++++++++++++++++++ .../tests/utils/failed_entity_handler.py | 121 +++++ .../integrations/mixins/test_sync_raw.py | 70 --- port_ocean/utils/failed_entity_handler.py | 67 +++ 10 files changed, 630 insertions(+), 123 deletions(-) create mode 100644 port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py create mode 100644 port_ocean/tests/utils/failed_entity_handler.py delete mode 100644 port_ocean/tests/utils/integrations/mixins/test_sync_raw.py create mode 100644 port_ocean/utils/failed_entity_handler.py diff --git a/CHANGELOG.md b/CHANGELOG.md index c8e215ea0e..8b959b6342 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,11 +12,11 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ### Bug Fixes -- On fail of retry because of topological sort - try unsorted upsert. -- Register callbacks of failed entities. +- When experiencing cyclic error on topological sort try unsorted upsert of the entities - Test upsert with dependencies, with self circular dependency and external entity dependency. -- Update topologicals sort tree creation so an entity cannot be its own dependency. -- When done with upserts, try topological sort on failed entities. +- Fix topologicals sort tree creation so an entity cannot be its own dependency +- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all + the failed entities through a topological sort and upsert them as well ## 0.14.7 (2024-12-09) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 8f628fe7e0..60cc2e26ca 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -60,7 +60,7 @@ async def upsert_entity( result = response.json() if ( response.status_code == 404 - and result.get("ok") is False + and not result.get("ok") and result.get("error") == "not_found" ): return False diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index c13f8f5022..94cc39e088 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -10,12 +10,11 @@ Callable, Awaitable, Union, - Tuple, - Coroutine, ) from uuid import uuid4 from loguru import logger +from port_ocean.utils.failed_entity_handler import FailedEntityHandler from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy @@ -25,10 +24,7 @@ ResourceContextNotFoundError, ) from port_ocean.utils.misc import get_time -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) -from port_ocean.core.models import Entity + if TYPE_CHECKING: from port_ocean.core.handlers.port_app_config.models import ( @@ -56,37 +52,8 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - _failed_entity_callback_list: list[ - Tuple[Entity, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]]] - ] = field(default_factory=list) - - def register_failed_upsert_call_arguments( - self, - entity: Entity, - func: Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]], - ) -> None: - self._failed_entity_callback_list.append((entity, func)) - - async def handle_failed(self) -> None: - entity_map: dict[ - str, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]] - ] = { - f"{obj.identifier}-{obj.blueprint}": func - for obj, func in self._failed_entity_callback_list - } - entity_list: list[Entity] = [ - obj for obj, func in self._failed_entity_callback_list - ] - - sorted_and_mapped = order_by_entities_dependencies(entity_list) - for obj in sorted_and_mapped: - func = entity_map.get(f"{obj.identifier}-{obj.blueprint}") - if func is not None: - await func() - - async def handle_failed_no_sort(self) -> None: - for obj, func in self._failed_entity_callback_list: - await func() + + failed_entity_handler = FailedEntityHandler() def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 68454ceb90..d7dd022e5b 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -127,14 +127,11 @@ async def upsert( if upsertedEntity: modified_entities.append(upsertedEntity) if upsertedEntity is False: - event.register_failed_upsert_call_arguments( + event.failed_entity_handler.register_failed_upsert_call_arguments( entity, - lambda: self.context.port_client.upsert_entity( - entity, - event.port_app_config.get_port_request_options(), - user_agent_type, - should_raise=False, - ), + event.port_app_config.get_port_request_options(), + user_agent_type, + self.context.port_client.upsert_entity, ) return modified_entities diff --git a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py index 5698fc8379..251c747887 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py +++ b/port_ocean/core/handlers/entities_state_applier/port/order_by_entities_dependencies.py @@ -14,7 +14,6 @@ def node(entity: Entity) -> Node: def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: nodes: dict[Node, Set[Node]] = {} entities_map = {} - for entity in entities: nodes[node(entity)] = set() entities_map[node(entity)] = entity diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index a0f86ee5cf..f9819f6cf9 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -29,7 +29,7 @@ ) from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results from port_ocean.exceptions.core import OceanAbortException - +import json SEND_RAW_DATA_EXAMPLES_AMOUNT = 5 @@ -426,6 +426,7 @@ async def sync_raw_all( use_cache=False ) logger.info(f"Resync will use the following mappings: {app_config.dict()}") + try: did_fetched_current_state = True entities_at_port = await ocean.port_client.search_entities( @@ -456,9 +457,9 @@ async def sync_raw_all( creation_results.append(await task) try: - await event.handle_failed() + await event.failed_entity_handler.handle_failed() except: - await event.handle_failed_no_sort() + await event.failed_entity_handler.handle_failed_no_sort() except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py new file mode 100644 index 0000000000..3b15308fb8 --- /dev/null +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py @@ -0,0 +1,425 @@ +from contextlib import asynccontextmanager +from graphlib import CycleError +from typing import Any, AsyncGenerator + +from httpx import Response +from port_ocean.clients.port.client import PortClient +from port_ocean.exceptions.core import OceanAbortException +import pytest +from unittest.mock import MagicMock, AsyncMock, patch +from port_ocean.ocean import Ocean +from port_ocean.context.ocean import PortOceanContext +from port_ocean.core.handlers.port_app_config.models import ( + EntityMapping, + MappingsConfig, + PortAppConfig, + PortResourceConfig, + ResourceConfig, + Selector, +) +from port_ocean.core.integrations.mixins import SyncRawMixin +from port_ocean.core.handlers.entities_state_applier.port.applier import ( + HttpEntitiesStateApplier, +) +from port_ocean.core.handlers.entity_processor.jq_entity_processor import ( + JQEntityProcessor, +) +from port_ocean.core.models import Entity +from port_ocean.context.event import EventContext, event_context, EventType +from port_ocean.clients.port.types import UserAgentType +from port_ocean.context.ocean import ocean +from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( + order_by_entities_dependencies, +) + + +@pytest.fixture +def mock_port_client(mock_http_client: MagicMock) -> PortClient: + mock_port_client = PortClient( + MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + mock_port_client.auth = AsyncMock() + mock_port_client.auth.headers = AsyncMock( + return_value={ + "Authorization": "test", + "User-Agent": "test", + } + ) + + mock_port_client.search_entities = AsyncMock(return_value=[]) # type: ignore + mock_port_client.client = mock_http_client + return mock_port_client + + +@pytest.fixture +def mock_http_client() -> MagicMock: + mock_http_client = MagicMock() + mock_upserted_entities = [] + + async def post(url: str, *args: Any, **kwargs: Any) -> Response: + entity = kwargs.get("json", {}) + if entity.get("properties", {}).get("mock_is_to_fail", {}): + return Response( + 404, headers=MagicMock(), json={"ok": False, "error": "not_found"} + ) + + mock_upserted_entities.append( + f"{entity.get('identifier')}-{entity.get('blueprint')}" + ) + return Response( + 200, + json={ + "entity": { + "identifier": entity.get("identifier"), + "blueprint": entity.get("blueprint"), + } + }, + ) + + mock_http_client.post = AsyncMock(side_effect=post) + return mock_http_client + + +@pytest.fixture +def mock_ocean(mock_port_client: PortClient) -> Ocean: + with patch("port_ocean.ocean.Ocean.__init__", return_value=None): + ocean_mock = Ocean( + MagicMock(), MagicMock(), MagicMock(), MagicMock(), MagicMock() + ) + ocean_mock.config = MagicMock() + ocean_mock.config.port = MagicMock() + ocean_mock.config.port.port_app_config_cache_ttl = 60 + ocean_mock.port_client = mock_port_client + + return ocean_mock + + +@pytest.fixture +def mock_context(mock_ocean: Ocean) -> PortOceanContext: + context = PortOceanContext(mock_ocean) + ocean._app = context.app + return context + + +@pytest.fixture +def mock_port_app_config() -> PortAppConfig: + return PortAppConfig( + enable_merge_entity=True, + delete_dependent_entities=True, + create_missing_related_entities=False, + resources=[ + ResourceConfig( + kind="project", + selector=Selector(query="true"), + port=PortResourceConfig( + entity=MappingsConfig( + mappings=EntityMapping( + identifier=".id | tostring", + title=".name", + blueprint='"service"', + properties={"url": ".web_url"}, + relations={}, + ) + ) + ), + ) + ], + ) + + +@pytest.fixture +def mock_port_app_config_handler(mock_port_app_config: PortAppConfig) -> MagicMock: + handler = MagicMock() + + async def get_config(use_cache: bool = True) -> Any: + return mock_port_app_config + + handler.get_port_app_config = get_config + return handler + + +@pytest.fixture +def mock_entity_processor(mock_context: PortOceanContext) -> JQEntityProcessor: + return JQEntityProcessor(mock_context) + + +@pytest.fixture +def mock_entities_state_applier( + mock_context: PortOceanContext, +) -> HttpEntitiesStateApplier: + return HttpEntitiesStateApplier(mock_context) + + +@pytest.fixture +def mock_sync_raw_mixin( + mock_entity_processor: JQEntityProcessor, + mock_entities_state_applier: HttpEntitiesStateApplier, + mock_port_app_config_handler: MagicMock, +) -> SyncRawMixin: + sync_raw_mixin = SyncRawMixin() + sync_raw_mixin._entity_processor = mock_entity_processor + sync_raw_mixin._entities_state_applier = mock_entities_state_applier + sync_raw_mixin._port_app_config_handler = mock_port_app_config_handler + sync_raw_mixin._get_resource_raw_results = AsyncMock(return_value=([{}], [])) # type: ignore + sync_raw_mixin._entity_processor.parse_items = AsyncMock(return_value=MagicMock()) # type: ignore + + return sync_raw_mixin + + +@asynccontextmanager +async def no_op_event_context( + existing_event: EventContext, +) -> AsyncGenerator[EventContext, None]: + yield existing_event + + +@pytest.mark.asyncio +async def test_sync_raw_mixin_self_dependency( + mock_sync_raw_mixin: SyncRawMixin, +) -> None: + entities_params = [ + ("entity_1", "service", {"service": "entity_1"}, True), + ("entity_2", "service", {"service": "entity_2"}, False), + ] + entities = [create_entity(*entity_param) for entity_param in entities_params] + + calc_result_mock = MagicMock() + calc_result_mock.entity_selector_diff.passed = entities + calc_result_mock.errors = [] + + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + + mock_order_by_entities_dependencies = MagicMock( + side_effect=order_by_entities_dependencies + ) + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + app_config = ( + await mock_sync_raw_mixin.port_app_config_handler.get_port_app_config( + use_cache=False + ) + ) + event.port_app_config = app_config + event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=event.failed_entity_handler.register_failed_upsert_call_arguments) # type: ignore + event.failed_entity_handler.handle_failed = MagicMock(side_effect=event.failed_entity_handler.handle_failed) # type: ignore + event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + + with patch( + "port_ocean.core.integrations.mixins.sync_raw.event_context", + lambda *args, **kwargs: no_op_event_context(event), + ): + with patch( + "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + mock_order_by_entities_dependencies, + ): + + await mock_sync_raw_mixin.sync_raw_all( + trigger_type="machine", user_agent_type=UserAgentType.exporter + ) + + assert ( + len(event.failed_entity_handler._failed_entity_callback_list) == 1 + ), "Expected one failed entity callback due to retry logic" + assert ( + event.failed_entity_handler.register_failed_upsert_call_arguments.call_count + == 1 + ) + assert event.failed_entity_handler.handle_failed.call_count == 1 + + assert mock_order_by_entities_dependencies.call_count == 1 + assert [ + call[0][0][0] + for call in mock_order_by_entities_dependencies.call_args_list + ] == [entity for entity in entities if entity.identifier == "entity_1"] + + assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0 + + +@pytest.mark.asyncio +async def test_sync_raw_mixin_circular_dependency( + mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean +) -> None: + entities_params = [ + ("entity_1", "service", {"service": "entity_2"}, True), + ("entity_2", "service", {"service": "entity_1"}, True), + ] + entities = [create_entity(*entity_param) for entity_param in entities_params] + + calc_result_mock = MagicMock() + calc_result_mock.entity_selector_diff.passed = entities + calc_result_mock.errors = [] + + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + + mock_order_by_entities_dependencies = MagicMock( + side_effect=order_by_entities_dependencies + ) + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + app_config = ( + await mock_sync_raw_mixin.port_app_config_handler.get_port_app_config( + use_cache=False + ) + ) + event.port_app_config = app_config + org = event.failed_entity_handler.register_failed_upsert_call_arguments + + def mock_register_failed_upsert_call_arguments( + *args: Any, **kwargs: Any + ) -> Any: + entity = args[0] + entity.properties["mock_is_to_fail"] = False + return org(*args, **kwargs) + + event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore + raiesed_error_handle_failed = [] + org_event_handle_failed = event.failed_entity_handler.handle_failed + + async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return await org_event_handle_failed(*args, **kwargs) + except Exception as e: + raiesed_error_handle_failed.append(e) + raise e + + event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore + event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + + with patch( + "port_ocean.core.integrations.mixins.sync_raw.event_context", + lambda *args, **kwargs: no_op_event_context(event), + ): + with patch( + "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + mock_order_by_entities_dependencies, + ): + + await mock_sync_raw_mixin.sync_raw_all( + trigger_type="machine", user_agent_type=UserAgentType.exporter + ) + + assert ( + len(event.failed_entity_handler._failed_entity_callback_list) == 2 + ), "Expected one failed entity callback due to retry logic" + assert ( + event.failed_entity_handler.register_failed_upsert_call_arguments.call_count + == 2 + ) + assert event.failed_entity_handler.handle_failed.call_count == 1 + assert len(raiesed_error_handle_failed) == 1 + assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) + assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError) + assert event.failed_entity_handler.handle_failed_no_sort.call_count == 1 + assert ( + len(mock_ocean.port_client.client.post.call_args_list) # type: ignore + / len(entities) + == 2 + ) + + +def create_entity( + id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool +) -> Entity: + entity = Entity(identifier=id, blueprint=blueprint) + entity.relations = relation + entity.properties = {"mock_is_to_fail": is_to_fail} + return entity + + +@pytest.mark.asyncio +async def test_sync_raw_mixin_dependency( + mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean +) -> None: + entities_params = [ + ("entity_1", "service", {"service": "entity_3"}, True), + ("entity_2", "service", {"service": "entity_4"}, True), + ("entity_3", "service", {"service": ""}, True), + ("entity_4", "service", {"service": "entity_3"}, True), + ("entity_5", "service", {"service": "entity_1"}, True), + ] + entities = [create_entity(*entity_param) for entity_param in entities_params] + + calc_result_mock = MagicMock() + calc_result_mock.entity_selector_diff.passed = entities + calc_result_mock.errors = [] + + mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore + + mock_order_by_entities_dependencies = MagicMock( + side_effect=order_by_entities_dependencies + ) + async with event_context(EventType.RESYNC, trigger_type="machine") as event: + app_config = ( + await mock_sync_raw_mixin.port_app_config_handler.get_port_app_config( + use_cache=False + ) + ) + event.port_app_config = app_config + org = event.failed_entity_handler.register_failed_upsert_call_arguments + + def mock_register_failed_upsert_call_arguments( + *args: Any, **kwargs: Any + ) -> None: + entity = args[0] + entity.properties["mock_is_to_fail"] = False + return org(*args, **kwargs) + + event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore + raiesed_error_handle_failed = [] + org_event_handle_failed = event.failed_entity_handler.handle_failed + + async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + try: + return await org_event_handle_failed(*args, **kwargs) + except Exception as e: + raiesed_error_handle_failed.append(e) + raise e + + event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore + event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + + with patch( + "port_ocean.core.integrations.mixins.sync_raw.event_context", + lambda *args, **kwargs: no_op_event_context(event), + ): + with patch( + "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + mock_order_by_entities_dependencies, + ): + + await mock_sync_raw_mixin.sync_raw_all( + trigger_type="machine", user_agent_type=UserAgentType.exporter + ) + + assert ( + len(event.failed_entity_handler._failed_entity_callback_list) == 5 + ), "Expected one failed entity callback due to retry logic" + assert ( + event.failed_entity_handler.register_failed_upsert_call_arguments.call_count + == 5 + ) + assert event.failed_entity_handler.handle_failed.call_count == 1 + assert len(raiesed_error_handle_failed) == 0 + assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore + assert mock_order_by_entities_dependencies.call_count == 1 + + first = mock_ocean.port_client.client.post.call_args_list[0:5] # type: ignore + second = mock_ocean.port_client.client.post.call_args_list[5:10] # type: ignore + + assert "-".join( + [call[1].get("json").get("identifier") for call in first] + ) == "-".join( + reversed( + [ + entity.identifier + for entity in calc_result_mock.entity_selector_diff.passed + ] + ) + ) + assert "-".join( + [call[1].get("json").get("identifier") for call in second] + ) in ( + "entity_3-entity_4-entity_1-entity_2-entity_5", + "entity_3-entity_4-entity_1-entity_5-entity_2", + "entity_3-entity_1-entity_4-entity_2-entity_5", + "entity_3-entity_1-entity_4-entity_5-entity_2", + ) + assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0 diff --git a/port_ocean/tests/utils/failed_entity_handler.py b/port_ocean/tests/utils/failed_entity_handler.py new file mode 100644 index 0000000000..dbd88b4d29 --- /dev/null +++ b/port_ocean/tests/utils/failed_entity_handler.py @@ -0,0 +1,121 @@ +from typing import Any +from port_ocean.core.models import Entity +from port_ocean.utils.failed_entity_handler import FailedEntityHandler +import pytest +from unittest.mock import MagicMock +from port_ocean.exceptions.core import ( + OceanAbortException, +) + + +def create_entity( + identifier: str, buleprint: str, dependencies: dict[str, str] = {} +) -> Entity: + entity = MagicMock() + entity.identifier = identifier + entity.blueprint = buleprint + entity.relations = dependencies or {} + return entity + + +processed_order: list[str] = [] + + +async def mock_activate(entity: Any, _: Any, __: Any, **kwargs: Any) -> bool: + processed_order.append(f"{entity.identifier}-{entity.blueprint}") + return True + + +@pytest.mark.asyncio +async def test_handle_failed_with_dependencies() -> None: + # processed_order:list[str] = [] + entity_a = create_entity( + "entity_a", + "buleprint_a", + ) # No dependencies + entity_b = create_entity( + "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Depends on entity_a + entity_c = create_entity( + "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} + ) # Depends on entity_b + + failed_entities_handler = FailedEntityHandler() + # Register fails with unsorted order + failed_entities_handler.register_failed_upsert_call_arguments( + entity_c, MagicMock(), MagicMock(), mock_activate + ) + failed_entities_handler.register_failed_upsert_call_arguments( + entity_a, MagicMock(), MagicMock(), mock_activate + ) + failed_entities_handler.register_failed_upsert_call_arguments( + entity_b, MagicMock(), MagicMock(), mock_activate + ) + + await failed_entities_handler.handle_failed() + assert processed_order == [ + "entity_a-buleprint_a", + "entity_b-buleprint_a", + "entity_c-buleprint_b", + ], f"Processed order: {processed_order}" + + +@pytest.mark.asyncio +async def test_handle_failed_with_self_dependencies() -> None: + entity_a = create_entity( + "entity_a", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Self dependency + entity_b = create_entity( + "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Depends on entity_a + entity_c = create_entity( + "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} + ) # Depends on entity_b + + failed_entities_handler = FailedEntityHandler() + + # Register fails with unsorted order + failed_entities_handler.register_failed_upsert_call_arguments( + entity_c, MagicMock(), MagicMock(), mock_activate + ) + failed_entities_handler.register_failed_upsert_call_arguments( + entity_a, MagicMock(), MagicMock(), mock_activate + ) + failed_entities_handler.register_failed_upsert_call_arguments( + entity_b, MagicMock(), MagicMock(), mock_activate + ) + + await failed_entities_handler.handle_failed() + assert processed_order == [ + "entity_a-buleprint_a", + "entity_b-buleprint_a", + "entity_c-buleprint_b", + ], f"Processed order: {processed_order}" + + +@pytest.mark.asyncio +async def test_handle_failed_with_circular_dependencies() -> None: + # processed_order:list[str] = [] + entity_a = create_entity( + "entity_a", "buleprint_a", {"dep_name_1": "entity_b"} + ) # Self dependency + entity_b = create_entity( + "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} + ) # Depends on entity_a + + failed_entities_handler = FailedEntityHandler() + try: + failed_entities_handler.register_failed_upsert_call_arguments( + entity_a, MagicMock(), MagicMock(), mock_activate + ) + failed_entities_handler.register_failed_upsert_call_arguments( + entity_b, MagicMock(), MagicMock(), mock_activate + ) + await failed_entities_handler.handle_failed() + + except OceanAbortException as e: + assert isinstance(e, OceanAbortException) + assert ( + e.args[0] + == "Cannot order entities due to cyclic dependencies. \nIf you do want to have cyclic dependencies, please make sure to set the keys 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." + ) diff --git a/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py b/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py deleted file mode 100644 index 01573a4dad..0000000000 --- a/port_ocean/tests/utils/integrations/mixins/test_sync_raw.py +++ /dev/null @@ -1,70 +0,0 @@ -import pytest -from unittest.mock import MagicMock -from port_ocean.context.event import event, event_context, EventType -from port_ocean.exceptions.core import ( - OceanAbortException, -) -from typing import Any - -def create_entity(identifier:str,buleprint:str, dependencies:dict[str,str]=None): - entity = MagicMock() - entity.identifier = identifier - entity.blueprint = buleprint - entity.relations = dependencies or {} - return entity - -async def mock_activate(processed_order:list[str],entity:MagicMock): - processed_order.append(f"{entity.identifier}-{entity.blueprint}") - return True - -@pytest.mark.asyncio -async def test_handle_failed_with_dependencies(): - processed_order:list[str] = [] - entity_a = create_entity("entity_a", "buleprint_a",) # No dependencies - entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a - entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b - - - async with event_context(EventType.RESYNC, "manual"): - # Register fails with unsorted order - event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c)) - event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) - event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) - - await event.handle_failed() - - assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}" - -@pytest.mark.asyncio -async def test_handle_failed_with_self_dependencies(): - processed_order:list[str] = [] - entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_a"}) # Self dependency - entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a - entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b - - - async with event_context(EventType.RESYNC, "manual"): - # Register fails with unsorted order - event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c)) - event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) - event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) - - await event.handle_failed() - - assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}" - -@pytest.mark.asyncio -async def test_handle_failed_with_circular_dependencies(): - processed_order:list[str] = [] - entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_b"}) # Self dependency - entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a - - try: - async with event_context(EventType.RESYNC, "manual"): - # Register fails with unsorted order - event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a)) - event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b)) - await event.handle_failed() - except OceanAbortException as e: - assert isinstance(e,OceanAbortException) - assert e.args[0] == "Cannot order entities due to cyclic dependencies. \nIf you do want to have cyclic dependencies, please make sure to set the keys 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." diff --git a/port_ocean/utils/failed_entity_handler.py b/port_ocean/utils/failed_entity_handler.py new file mode 100644 index 0000000000..3f6c24a6b9 --- /dev/null +++ b/port_ocean/utils/failed_entity_handler.py @@ -0,0 +1,67 @@ +from port_ocean.clients.port.types import UserAgentType +from port_ocean.core.models import Entity +from port_ocean.clients.port.types import RequestOptions +from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( + order_by_entities_dependencies, +) +from typing import ( + Any, + Callable, + Tuple, +) +from dataclasses import dataclass, field +from loguru import logger + + +@dataclass +class FailedEntityHandler: + _failed_entity_callback_list: list[ + Tuple[ + Entity, + Tuple[ + Tuple[Entity, RequestOptions, UserAgentType], + Callable[[Entity, RequestOptions, UserAgentType], Any], + ], + ] + ] = field(default_factory=list) + + def register_failed_upsert_call_arguments( + self, + entity: Entity, + get_port_request_options: RequestOptions, + user_agent_type: UserAgentType, + func: Callable[[Entity, RequestOptions, UserAgentType], Any], + ) -> None: + logger.debug( + f"Will retry upserting entity - {entity.identifier} at the end of resync" + ) + self._failed_entity_callback_list.append( + (entity, ((entity, get_port_request_options, user_agent_type), func)) + ) + + async def handle_failed(self) -> None: + entity_map: dict[ + str, + Tuple[ + Tuple[Entity, RequestOptions, UserAgentType], + Callable[[Entity, RequestOptions, UserAgentType], Any], + ], + ] = { + f"{obj.identifier}-{obj.blueprint}": call + for obj, call in self._failed_entity_callback_list + } + entity_list: list[Entity] = [ + obj for obj, call in self._failed_entity_callback_list + ] + + sorted_and_mapped = order_by_entities_dependencies(entity_list) + for obj in sorted_and_mapped: + call = entity_map.get(f"{obj.identifier}-{obj.blueprint}") + if call is not None: + args, func = call + await func(*args, **{"should_raise": False}) + + async def handle_failed_no_sort(self) -> None: + for obj, call in self._failed_entity_callback_list: + args, func = call + await func(*args, **{"should_raise": False}) From 209e64bc33175b5555d02ef20990a77317ac190b Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Sun, 22 Dec 2024 10:42:55 +0200 Subject: [PATCH 03/11] [Core] Add retry for failed upsertes and handle circular dependencies 1. Extrac logic of topological sort into a class. 2. Modify tests. --- port_ocean/clients/port/mixins/entities.py | 9 +- port_ocean/context/event.py | 4 +- .../entities_state_applier/port/applier.py | 7 +- .../core/integrations/mixins/sync_raw.py | 12 +- port_ocean/core/models.py | 4 + .../{test_sync_raw_v2.py => test_sync_raw.py} | 103 +++++++----------- ...r.py => test_entity_topological_sorter.py} | 76 +++++-------- port_ocean/utils/entity_topological_sorter.py | 37 +++++++ port_ocean/utils/failed_entity_handler.py | 67 ------------ 9 files changed, 123 insertions(+), 196 deletions(-) rename port_ocean/tests/core/handlers/mixins/{test_sync_raw_v2.py => test_sync_raw.py} (76%) rename port_ocean/tests/utils/{failed_entity_handler.py => test_entity_topological_sorter.py} (53%) create mode 100644 port_ocean/utils/entity_topological_sorter.py delete mode 100644 port_ocean/utils/failed_entity_handler.py diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 60cc2e26ca..c22601a95e 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -11,7 +11,8 @@ handle_status_code, PORT_HTTP_MAX_CONNECTIONS_LIMIT, ) -from port_ocean.core.models import Entity +from port_ocean.core.models import Entity, PortApiStatus +from starlette import status class EntityClientMixin: @@ -50,7 +51,6 @@ async def upsert_entity( }, extensions={"retryable": True}, ) - if response.is_error: logger.error( f"Error {'Validating' if validation_only else 'Upserting'} " @@ -58,10 +58,11 @@ async def upsert_entity( f"blueprint: {entity.blueprint}" ) result = response.json() + if ( - response.status_code == 404 + response.status_code == status.HTTP_404_NOT_FOUND and not result.get("ok") - and result.get("error") == "not_found" + and result.get("error") == PortApiStatus.NOT_FOUND.value ): return False handle_status_code(response, should_raise) diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 94cc39e088..ecbdd8b6c3 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,7 +14,7 @@ from uuid import uuid4 from loguru import logger -from port_ocean.utils.failed_entity_handler import FailedEntityHandler +from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy @@ -53,7 +53,7 @@ class EventContext: _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - failed_entity_handler = FailedEntityHandler() + entity_topological_sorter = EntityTopologicalSorter() def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index d7dd022e5b..aa0c4324c2 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -127,12 +127,7 @@ async def upsert( if upsertedEntity: modified_entities.append(upsertedEntity) if upsertedEntity is False: - event.failed_entity_handler.register_failed_upsert_call_arguments( - entity, - event.port_app_config.get_port_request_options(), - user_agent_type, - self.context.port_client.upsert_entity, - ) + event.entity_topological_sorter.register_entity(entity) return modified_entities async def delete( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index f9819f6cf9..8d8158a4d9 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -1,4 +1,5 @@ import asyncio +from graphlib import CycleError import inspect import typing from typing import Callable, Awaitable, Any @@ -457,9 +458,14 @@ async def sync_raw_all( creation_results.append(await task) try: - await event.failed_entity_handler.handle_failed() - except: - await event.failed_entity_handler.handle_failed_no_sort() + for entity in event.entity_topological_sorter.get_entities(): + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + + except OceanAbortException as ocean_abort: + if isinstance(ocean_abort.__cause__,CycleError): + for entity in event.entity_topological_sorter.entities: + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index fb588becda..a22cc208c0 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -11,6 +11,10 @@ class Runtime(Enum): OnPrem = "OnPrem" +class PortApiStatus(Enum): + NOT_FOUND = "not_found" + + class Entity(BaseModel): identifier: Any blueprint: Any diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py similarity index 76% rename from port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py rename to port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 3b15308fb8..353aaa4de9 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -173,6 +173,15 @@ async def no_op_event_context( yield existing_event +def create_entity( + id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool +) -> Entity: + entity = Entity(identifier=id, blueprint=blueprint) + entity.relations = relation + entity.properties = {"mock_is_to_fail": is_to_fail} + return entity + + @pytest.mark.asyncio async def test_sync_raw_mixin_self_dependency( mock_sync_raw_mixin: SyncRawMixin, @@ -199,16 +208,15 @@ async def test_sync_raw_mixin_self_dependency( ) ) event.port_app_config = app_config - event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=event.failed_entity_handler.register_failed_upsert_call_arguments) # type: ignore - event.failed_entity_handler.handle_failed = MagicMock(side_effect=event.failed_entity_handler.handle_failed) # type: ignore - event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -217,13 +225,10 @@ async def test_sync_raw_mixin_self_dependency( ) assert ( - len(event.failed_entity_handler._failed_entity_callback_list) == 1 + len(event.entity_topological_sorter.entities) == 1 ), "Expected one failed entity callback due to retry logic" - assert ( - event.failed_entity_handler.register_failed_upsert_call_arguments.call_count - == 1 - ) - assert event.failed_entity_handler.handle_failed.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 1 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert mock_order_by_entities_dependencies.call_count == 1 assert [ @@ -231,8 +236,6 @@ async def test_sync_raw_mixin_self_dependency( for call in mock_order_by_entities_dependencies.call_args_list ] == [entity for entity in entities if entity.identifier == "entity_1"] - assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0 - @pytest.mark.asyncio async def test_sync_raw_mixin_circular_dependency( @@ -260,35 +263,32 @@ async def test_sync_raw_mixin_circular_dependency( ) ) event.port_app_config = app_config - org = event.failed_entity_handler.register_failed_upsert_call_arguments + org = event.entity_topological_sorter.register_entity - def mock_register_failed_upsert_call_arguments( - *args: Any, **kwargs: Any - ) -> Any: + def mock_register_entity(*args: Any, **kwargs: Any) -> Any: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_handle_failed = event.failed_entity_handler.handle_failed + org_get_entities = event.entity_topological_sorter.get_entities - async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: try: - return await org_event_handle_failed(*args, **kwargs) + return list(org_get_entities(*args, **kwargs)) except Exception as e: raiesed_error_handle_failed.append(e) raise e - event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore - event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -297,17 +297,13 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event.failed_entity_handler._failed_entity_callback_list) == 2 + len(event.entity_topological_sorter.entities) == 2 ), "Expected one failed entity callback due to retry logic" - assert ( - event.failed_entity_handler.register_failed_upsert_call_arguments.call_count - == 2 - ) - assert event.failed_entity_handler.handle_failed.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 2 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 1 assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError) - assert event.failed_entity_handler.handle_failed_no_sort.call_count == 1 assert ( len(mock_ocean.port_client.client.post.call_args_list) # type: ignore / len(entities) @@ -315,15 +311,6 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) -def create_entity( - id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool -) -> Entity: - entity = Entity(identifier=id, blueprint=blueprint) - entity.relations = relation - entity.properties = {"mock_is_to_fail": is_to_fail} - return entity - - @pytest.mark.asyncio async def test_sync_raw_mixin_dependency( mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean @@ -353,35 +340,32 @@ async def test_sync_raw_mixin_dependency( ) ) event.port_app_config = app_config - org = event.failed_entity_handler.register_failed_upsert_call_arguments + org = event.entity_topological_sorter.register_entity - def mock_register_failed_upsert_call_arguments( - *args: Any, **kwargs: Any - ) -> None: + def mock_register_entity(*args: Any, **kwargs: Any) -> None: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_handle_failed = event.failed_entity_handler.handle_failed + org_event_get_entities = event.entity_topological_sorter.get_entities - async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: try: - return await org_event_handle_failed(*args, **kwargs) + return org_event_get_entities(*args, **kwargs) except Exception as e: raiesed_error_handle_failed.append(e) raise e - event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore - event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -390,13 +374,10 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event.failed_entity_handler._failed_entity_callback_list) == 5 + len(event.entity_topological_sorter.entities) == 5 ), "Expected one failed entity callback due to retry logic" - assert ( - event.failed_entity_handler.register_failed_upsert_call_arguments.call_count - == 5 - ) - assert event.failed_entity_handler.handle_failed.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 5 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 0 assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore assert mock_order_by_entities_dependencies.call_count == 1 @@ -406,14 +387,7 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: assert "-".join( [call[1].get("json").get("identifier") for call in first] - ) == "-".join( - reversed( - [ - entity.identifier - for entity in calc_result_mock.entity_selector_diff.passed - ] - ) - ) + ) == "-".join(reversed([entity.identifier for entity in entities])) assert "-".join( [call[1].get("json").get("identifier") for call in second] ) in ( @@ -422,4 +396,3 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: "entity_3-entity_1-entity_4-entity_2-entity_5", "entity_3-entity_1-entity_4-entity_5-entity_2", ) - assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0 diff --git a/port_ocean/tests/utils/failed_entity_handler.py b/port_ocean/tests/utils/test_entity_topological_sorter.py similarity index 53% rename from port_ocean/tests/utils/failed_entity_handler.py rename to port_ocean/tests/utils/test_entity_topological_sorter.py index dbd88b4d29..69be7a048c 100644 --- a/port_ocean/tests/utils/failed_entity_handler.py +++ b/port_ocean/tests/utils/test_entity_topological_sorter.py @@ -1,7 +1,5 @@ -from typing import Any from port_ocean.core.models import Entity -from port_ocean.utils.failed_entity_handler import FailedEntityHandler -import pytest +from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter from unittest.mock import MagicMock from port_ocean.exceptions.core import ( OceanAbortException, @@ -18,16 +16,7 @@ def create_entity( return entity -processed_order: list[str] = [] - - -async def mock_activate(entity: Any, _: Any, __: Any, **kwargs: Any) -> bool: - processed_order.append(f"{entity.identifier}-{entity.blueprint}") - return True - - -@pytest.mark.asyncio -async def test_handle_failed_with_dependencies() -> None: +def test_handle_failed_with_dependencies() -> None: # processed_order:list[str] = [] entity_a = create_entity( "entity_a", @@ -40,19 +29,16 @@ async def test_handle_failed_with_dependencies() -> None: "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} ) # Depends on entity_b - failed_entities_handler = FailedEntityHandler() + entity_topological_sort = EntityTopologicalSorter() # Register fails with unsorted order - failed_entities_handler.register_failed_upsert_call_arguments( - entity_c, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_a, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_b, MagicMock(), MagicMock(), mock_activate - ) - - await failed_entities_handler.handle_failed() + entity_topological_sort.register_entity(entity_c) + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + + processed_order = [ + f"{entity.identifier}-{entity.blueprint}" + for entity in list(entity_topological_sort.get_entities()) + ] assert processed_order == [ "entity_a-buleprint_a", "entity_b-buleprint_a", @@ -60,8 +46,7 @@ async def test_handle_failed_with_dependencies() -> None: ], f"Processed order: {processed_order}" -@pytest.mark.asyncio -async def test_handle_failed_with_self_dependencies() -> None: +def test_handle_failed_with_self_dependencies() -> None: entity_a = create_entity( "entity_a", "buleprint_a", {"dep_name_1": "entity_a"} ) # Self dependency @@ -72,20 +57,18 @@ async def test_handle_failed_with_self_dependencies() -> None: "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} ) # Depends on entity_b - failed_entities_handler = FailedEntityHandler() + entity_topological_sort = EntityTopologicalSorter() # Register fails with unsorted order - failed_entities_handler.register_failed_upsert_call_arguments( - entity_c, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_a, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_b, MagicMock(), MagicMock(), mock_activate - ) - - await failed_entities_handler.handle_failed() + entity_topological_sort.register_entity(entity_c) + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + + processed_order = [ + f"{entity.identifier}-{entity.blueprint}" + for entity in list(entity_topological_sort.get_entities()) + ] + assert processed_order == [ "entity_a-buleprint_a", "entity_b-buleprint_a", @@ -93,8 +76,7 @@ async def test_handle_failed_with_self_dependencies() -> None: ], f"Processed order: {processed_order}" -@pytest.mark.asyncio -async def test_handle_failed_with_circular_dependencies() -> None: +def test_handle_failed_with_circular_dependencies() -> None: # processed_order:list[str] = [] entity_a = create_entity( "entity_a", "buleprint_a", {"dep_name_1": "entity_b"} @@ -103,15 +85,11 @@ async def test_handle_failed_with_circular_dependencies() -> None: "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} ) # Depends on entity_a - failed_entities_handler = FailedEntityHandler() + entity_topological_sort = EntityTopologicalSorter() try: - failed_entities_handler.register_failed_upsert_call_arguments( - entity_a, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_b, MagicMock(), MagicMock(), mock_activate - ) - await failed_entities_handler.handle_failed() + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + entity_topological_sort.get_entities() except OceanAbortException as e: assert isinstance(e, OceanAbortException) diff --git a/port_ocean/utils/entity_topological_sorter.py b/port_ocean/utils/entity_topological_sorter.py new file mode 100644 index 0000000000..b64e0f24d7 --- /dev/null +++ b/port_ocean/utils/entity_topological_sorter.py @@ -0,0 +1,37 @@ +from typing import Any, Generator +from port_ocean.core.models import Entity +from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( + order_by_entities_dependencies, +) + +from dataclasses import dataclass, field +from loguru import logger + + +@dataclass +class EntityTopologicalSorter: + entities: list[Entity] = field(default_factory=list) + + def register_entity( + self, + entity: Entity, + ) -> None: + logger.debug( + f"Will retry upserting entity - {entity.identifier} at the end of resync" + ) + self.entities.append(entity) + + @staticmethod + def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: + return order_by_entities_dependencies(entities) + + def get_entities(self) -> Generator[Entity, Any, None]: + entity_map: dict[str, Entity] = { + f"{entity.identifier}-{entity.blueprint}": entity + for entity in self.entities + } + sorted_and_mapped = order_by_entities_dependencies(self.entities) + for obj in sorted_and_mapped: + entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") + if entity is not None: + yield entity diff --git a/port_ocean/utils/failed_entity_handler.py b/port_ocean/utils/failed_entity_handler.py deleted file mode 100644 index 3f6c24a6b9..0000000000 --- a/port_ocean/utils/failed_entity_handler.py +++ /dev/null @@ -1,67 +0,0 @@ -from port_ocean.clients.port.types import UserAgentType -from port_ocean.core.models import Entity -from port_ocean.clients.port.types import RequestOptions -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) -from typing import ( - Any, - Callable, - Tuple, -) -from dataclasses import dataclass, field -from loguru import logger - - -@dataclass -class FailedEntityHandler: - _failed_entity_callback_list: list[ - Tuple[ - Entity, - Tuple[ - Tuple[Entity, RequestOptions, UserAgentType], - Callable[[Entity, RequestOptions, UserAgentType], Any], - ], - ] - ] = field(default_factory=list) - - def register_failed_upsert_call_arguments( - self, - entity: Entity, - get_port_request_options: RequestOptions, - user_agent_type: UserAgentType, - func: Callable[[Entity, RequestOptions, UserAgentType], Any], - ) -> None: - logger.debug( - f"Will retry upserting entity - {entity.identifier} at the end of resync" - ) - self._failed_entity_callback_list.append( - (entity, ((entity, get_port_request_options, user_agent_type), func)) - ) - - async def handle_failed(self) -> None: - entity_map: dict[ - str, - Tuple[ - Tuple[Entity, RequestOptions, UserAgentType], - Callable[[Entity, RequestOptions, UserAgentType], Any], - ], - ] = { - f"{obj.identifier}-{obj.blueprint}": call - for obj, call in self._failed_entity_callback_list - } - entity_list: list[Entity] = [ - obj for obj, call in self._failed_entity_callback_list - ] - - sorted_and_mapped = order_by_entities_dependencies(entity_list) - for obj in sorted_and_mapped: - call = entity_map.get(f"{obj.identifier}-{obj.blueprint}") - if call is not None: - args, func = call - await func(*args, **{"should_raise": False}) - - async def handle_failed_no_sort(self) -> None: - for obj, call in self._failed_entity_callback_list: - args, func = call - await func(*args, **{"should_raise": False}) From 3d49fa738ea0e92b3711bf31f7e41c1ab9278230 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Sun, 22 Dec 2024 17:06:03 +0200 Subject: [PATCH 04/11] [Core] Add retry for failed upserts and handle circular dependencies 1. Change location of files. 2. Exctract logic of handle failed into a function. 3. Update get_entities. --- CHANGELOG.md | 9 +- port_ocean/clients/port/mixins/entities.py | 10 +- port_ocean/context/event.py | 2 +- port_ocean/core/defaults/initialize.py | 2 +- .../entities_state_applier/port/applier.py | 12 ++- .../entity_processor/jq_entity_processor.py | 5 +- .../core/integrations/mixins/sync_raw.py | 28 ++++-- port_ocean/core/integrations/mixins/utils.py | 2 +- port_ocean/core/models.py | 2 +- .../core/utils/entity_topological_sorter.py | 93 +++++++++++++++++++ port_ocean/core/{ => utils}/utils.py | 0 port_ocean/run.py | 2 +- .../core/handlers/mixins/test_sync_raw.py | 22 +++-- .../utils/test_entity_topological_sorter.py | 2 +- port_ocean/utils/entity_topological_sorter.py | 37 -------- 15 files changed, 154 insertions(+), 74 deletions(-) create mode 100644 port_ocean/core/utils/entity_topological_sorter.py rename port_ocean/core/{ => utils}/utils.py (100%) rename port_ocean/tests/{ => core}/utils/test_entity_topological_sorter.py (97%) delete mode 100644 port_ocean/utils/entity_topological_sorter.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 8b959b6342..4699d523b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,13 +10,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm ## 0.15.3 (2024-12-15) +### Improvements + +- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all + the failed entities through a topological sort and upsert them as well +- Test upsert with dependencies, with self circular dependency and external entity dependency. + ### Bug Fixes - When experiencing cyclic error on topological sort try unsorted upsert of the entities -- Test upsert with dependencies, with self circular dependency and external entity dependency. - Fix topologicals sort tree creation so an entity cannot be its own dependency -- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all - the failed entities through a topological sort and upsert them as well ## 0.14.7 (2024-12-09) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index c22601a95e..99aae40f53 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -11,7 +11,7 @@ handle_status_code, PORT_HTTP_MAX_CONNECTIONS_LIMIT, ) -from port_ocean.core.models import Entity, PortApiStatus +from port_ocean.core.models import Entity, PortAPIErrorMessage from starlette import status @@ -31,6 +31,11 @@ async def upsert_entity( user_agent_type: UserAgentType | None = None, should_raise: bool = True, ) -> Entity | None | Literal[False]: + """ + [Entity] will be returned on happy flow + [None] will be returned if entity is using search identifier + [False] will be returned if upsert failed because of unmet dependency + """ validation_only = request_options["validation_only"] async with self.semaphore: logger.debug( @@ -62,8 +67,9 @@ async def upsert_entity( if ( response.status_code == status.HTTP_404_NOT_FOUND and not result.get("ok") - and result.get("error") == PortApiStatus.NOT_FOUND.value + and result.get("error") == PortAPIErrorMessage.NOT_FOUND.value ): + # Return false to differentiate from `result_entity.is_using_search_identifier` return False handle_status_code(response, should_raise) result = response.json() diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index ecbdd8b6c3..fcbe7e6d0f 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,7 +14,7 @@ from uuid import uuid4 from loguru import logger -from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy diff --git a/port_ocean/core/defaults/initialize.py b/port_ocean/core/defaults/initialize.py index c6d8d88ed1..1ae1a49cee 100644 --- a/port_ocean/core/defaults/initialize.py +++ b/port_ocean/core/defaults/initialize.py @@ -14,7 +14,7 @@ ) from port_ocean.core.handlers.port_app_config.models import PortAppConfig from port_ocean.core.models import Blueprint -from port_ocean.core.utils import gather_and_split_errors_from_results +from port_ocean.core.utils.utils import gather_and_split_errors_from_results from port_ocean.exceptions.port_defaults import ( AbortDefaultCreationError, ) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index aa0c4324c2..291c3f1099 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -8,12 +8,11 @@ from port_ocean.core.handlers.entities_state_applier.port.get_related_entities import ( get_related_entities, ) -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) + from port_ocean.core.models import Entity from port_ocean.core.ocean_types import EntityDiff -from port_ocean.core.utils import is_same_entity, get_port_diff +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter +from port_ocean.core.utils.utils import is_same_entity, get_port_diff class HttpEntitiesStateApplier(BaseEntitiesStateApplier): @@ -126,6 +125,7 @@ async def upsert( ) if upsertedEntity: modified_entities.append(upsertedEntity) + # condition to false to differentiate from `result_entity.is_using_search_identifier` if upsertedEntity is False: event.entity_topological_sorter.register_entity(entity) return modified_entities @@ -142,7 +142,9 @@ async def delete( should_raise=False, ) else: - ordered_deleted_entities = order_by_entities_dependencies(entities) + ordered_deleted_entities = ( + EntityTopologicalSorter.order_by_entities_dependencies(entities) + ) for entity in ordered_deleted_entities: await self.context.port_client.delete_entity( diff --git a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py index 17ec9c49b6..3384e32413 100644 --- a/port_ocean/core/handlers/entity_processor/jq_entity_processor.py +++ b/port_ocean/core/handlers/entity_processor/jq_entity_processor.py @@ -16,7 +16,10 @@ EntitySelectorDiff, CalculationResult, ) -from port_ocean.core.utils import gather_and_split_errors_from_results, zip_and_sum +from port_ocean.core.utils.utils import ( + gather_and_split_errors_from_results, + zip_and_sum, +) from port_ocean.exceptions.core import EntityProcessorException from port_ocean.utils.queue_utils import process_in_queue diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 8d8158a4d9..3ce1bb19d2 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -28,9 +28,9 @@ RAW_ITEM, CalculationResult, ) -from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results +from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results from port_ocean.exceptions.core import OceanAbortException -import json + SEND_RAW_DATA_EXAMPLES_AMOUNT = 5 @@ -397,7 +397,22 @@ async def update_raw_diff( {"before": entities_before_flatten, "after": entities_after_flatten}, user_agent_type, ) + async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: + try: + if event.entity_topological_sorter.is_to_execute(): + logger.info("Executings topological sort of entities failed to upsert") + else: + logger.info("No failed entities on upsert") + return + for entity in event.entity_topological_sorter.get_entities(): + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + + except OceanAbortException as ocean_abort: + logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered") + if isinstance(ocean_abort.__cause__,CycleError): + for entity in event.entity_topological_sorter.get_entities(False): + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) async def sync_raw_all( self, _: dict[Any, Any] | None = None, @@ -457,15 +472,8 @@ async def sync_raw_all( event.on_abort(lambda: task.cancel()) creation_results.append(await task) - try: - for entity in event.entity_topological_sorter.get_entities(): - await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) - - except OceanAbortException as ocean_abort: - if isinstance(ocean_abort.__cause__,CycleError): - for entity in event.entity_topological_sorter.entities: - await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + await self.sort_and_upsert_failed_entities(user_agent_type) except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/core/integrations/mixins/utils.py b/port_ocean/core/integrations/mixins/utils.py index 91a0623956..8819481cae 100644 --- a/port_ocean/core/integrations/mixins/utils.py +++ b/port_ocean/core/integrations/mixins/utils.py @@ -9,7 +9,7 @@ RESYNC_EVENT_LISTENER, RESYNC_RESULT, ) -from port_ocean.core.utils import validate_result +from port_ocean.core.utils.utils import validate_result from port_ocean.exceptions.core import ( RawObjectValidationException, OceanAbortException, diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index a22cc208c0..3ac9df6946 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -11,7 +11,7 @@ class Runtime(Enum): OnPrem = "OnPrem" -class PortApiStatus(Enum): +class PortAPIErrorMessage(Enum): NOT_FOUND = "not_found" diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py new file mode 100644 index 0000000000..d8a65e0354 --- /dev/null +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -0,0 +1,93 @@ +from typing import Any, Generator +from port_ocean.core.models import Entity + +from dataclasses import dataclass, field +from loguru import logger + +from graphlib import TopologicalSorter, CycleError +from typing import Set + +from port_ocean.exceptions.core import OceanAbortException + +Node = tuple[str, str] + + +@dataclass +class EntityTopologicalSorter: + entities: list[Entity] = field(default_factory=list) + + def register_entity( + self, + entity: Entity, + ) -> None: + logger.debug( + f"Will retry upserting entity - {entity.identifier} at the end of resync" + ) + self.entities.append(entity) + + def is_to_execute(self): + return len(self.entities) + + def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]: + if not sorted: + for entity in self.entities: + yield entity + return + + entity_map: dict[str, Entity] = { + f"{entity.identifier}-{entity.blueprint}": entity + for entity in self.entities + } + sorted_and_mapped = EntityTopologicalSorter.order_by_entities_dependencies( + self.entities + ) + for obj in sorted_and_mapped: + entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") + if entity is not None: + yield entity + + @staticmethod + def node(entity: Entity) -> Node: + return entity.identifier, entity.blueprint + + @staticmethod + def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: + nodes: dict[Node, Set[Node]] = {} + entities_map = {} + for entity in entities: + nodes[EntityTopologicalSorter.node(entity)] = set() + entities_map[EntityTopologicalSorter.node(entity)] = entity + + for entity in entities: + relation_target_ids: list[str] = sum( + [ + identifiers if isinstance(identifiers, list) else [identifiers] + for identifiers in entity.relations.values() + if identifiers is not None + ], + [], + ) + related_entities = [ + related + for related in entities + if related.identifier in relation_target_ids + ] + + for related_entity in related_entities: + if ( + entity.blueprint is not related_entity.blueprint + or entity.identifier is not related_entity.identifier + ): + nodes[EntityTopologicalSorter.node(entity)].add( + EntityTopologicalSorter.node(related_entity) + ) + + sort_op = TopologicalSorter(nodes) + try: + return [entities_map[item] for item in sort_op.static_order()] + except CycleError as ex: + raise OceanAbortException( + "Cannot order entities due to cyclic dependencies. \n" + "If you do want to have cyclic dependencies, please make sure to set the keys" + " 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port." + ) from ex diff --git a/port_ocean/core/utils.py b/port_ocean/core/utils/utils.py similarity index 100% rename from port_ocean/core/utils.py rename to port_ocean/core/utils/utils.py diff --git a/port_ocean/run.py b/port_ocean/run.py index 3ec4262f3f..b90f27f816 100644 --- a/port_ocean/run.py +++ b/port_ocean/run.py @@ -9,7 +9,7 @@ from port_ocean.config.dynamic import default_config_factory from port_ocean.config.settings import ApplicationSettings, LogLevelType from port_ocean.core.defaults.initialize import initialize_defaults -from port_ocean.core.utils import validate_integration_runtime +from port_ocean.core.utils.utils import validate_integration_runtime from port_ocean.log.logger_setup import setup_logger from port_ocean.ocean import Ocean from port_ocean.utils.misc import get_spec_file, load_module diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 353aaa4de9..f4dfbe2449 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -4,6 +4,7 @@ from httpx import Response from port_ocean.clients.port.client import PortClient +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from port_ocean.exceptions.core import OceanAbortException import pytest from unittest.mock import MagicMock, AsyncMock, patch @@ -28,9 +29,6 @@ from port_ocean.context.event import EventContext, event_context, EventType from port_ocean.clients.port.types import UserAgentType from port_ocean.context.ocean import ocean -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) @pytest.fixture @@ -199,7 +197,7 @@ async def test_sync_raw_mixin_self_dependency( mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore mock_order_by_entities_dependencies = MagicMock( - side_effect=order_by_entities_dependencies + side_effect=EntityTopologicalSorter.order_by_entities_dependencies ) async with event_context(EventType.RESYNC, trigger_type="machine") as event: app_config = ( @@ -216,7 +214,7 @@ async def test_sync_raw_mixin_self_dependency( lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", + "port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -254,7 +252,7 @@ async def test_sync_raw_mixin_circular_dependency( mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore mock_order_by_entities_dependencies = MagicMock( - side_effect=order_by_entities_dependencies + side_effect=EntityTopologicalSorter.order_by_entities_dependencies ) async with event_context(EventType.RESYNC, trigger_type="machine") as event: app_config = ( @@ -288,7 +286,7 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", + "port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -300,7 +298,11 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: len(event.entity_topological_sorter.entities) == 2 ), "Expected one failed entity callback due to retry logic" assert event.entity_topological_sorter.register_entity.call_count == 2 - assert event.entity_topological_sorter.get_entities.call_count == 1 + assert event.entity_topological_sorter.get_entities.call_count == 2 + assert [ + call[0] + for call in event.entity_topological_sorter.get_entities.call_args_list + ] == [(), (False,)] assert len(raiesed_error_handle_failed) == 1 assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError) @@ -331,7 +333,7 @@ async def test_sync_raw_mixin_dependency( mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore mock_order_by_entities_dependencies = MagicMock( - side_effect=order_by_entities_dependencies + side_effect=EntityTopologicalSorter.order_by_entities_dependencies ) async with event_context(EventType.RESYNC, trigger_type="machine") as event: app_config = ( @@ -365,7 +367,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", + "port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): diff --git a/port_ocean/tests/utils/test_entity_topological_sorter.py b/port_ocean/tests/core/utils/test_entity_topological_sorter.py similarity index 97% rename from port_ocean/tests/utils/test_entity_topological_sorter.py rename to port_ocean/tests/core/utils/test_entity_topological_sorter.py index 69be7a048c..4fa4f5c2f7 100644 --- a/port_ocean/tests/utils/test_entity_topological_sorter.py +++ b/port_ocean/tests/core/utils/test_entity_topological_sorter.py @@ -1,5 +1,5 @@ from port_ocean.core.models import Entity -from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter +from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter from unittest.mock import MagicMock from port_ocean.exceptions.core import ( OceanAbortException, diff --git a/port_ocean/utils/entity_topological_sorter.py b/port_ocean/utils/entity_topological_sorter.py deleted file mode 100644 index b64e0f24d7..0000000000 --- a/port_ocean/utils/entity_topological_sorter.py +++ /dev/null @@ -1,37 +0,0 @@ -from typing import Any, Generator -from port_ocean.core.models import Entity -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) - -from dataclasses import dataclass, field -from loguru import logger - - -@dataclass -class EntityTopologicalSorter: - entities: list[Entity] = field(default_factory=list) - - def register_entity( - self, - entity: Entity, - ) -> None: - logger.debug( - f"Will retry upserting entity - {entity.identifier} at the end of resync" - ) - self.entities.append(entity) - - @staticmethod - def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: - return order_by_entities_dependencies(entities) - - def get_entities(self) -> Generator[Entity, Any, None]: - entity_map: dict[str, Entity] = { - f"{entity.identifier}-{entity.blueprint}": entity - for entity in self.entities - } - sorted_and_mapped = order_by_entities_dependencies(self.entities) - for obj in sorted_and_mapped: - entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") - if entity is not None: - yield entity From ab817e5c8c92c2309a8473e8ed73e270838e7e39 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 23 Dec 2024 16:36:23 +0200 Subject: [PATCH 05/11] [Core] Add retry for failed upserts and handle circular dependencies 1. Update `upsert_entity` function description. 2. Remove unused logic from `upsert`. --- port_ocean/clients/port/mixins/entities.py | 21 ++++++++++++++++--- .../entities_state_applier/port/applier.py | 11 ---------- .../core/integrations/mixins/sync_raw.py | 2 +- .../core/utils/entity_topological_sorter.py | 15 ++++++------- .../core/handlers/mixins/test_sync_raw.py | 2 +- port_ocean/tests/core/test_utils.py | 2 +- 6 files changed, 27 insertions(+), 26 deletions(-) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 99aae40f53..f33ab737da 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -32,9 +32,24 @@ async def upsert_entity( should_raise: bool = True, ) -> Entity | None | Literal[False]: """ - [Entity] will be returned on happy flow - [None] will be returned if entity is using search identifier - [False] will be returned if upsert failed because of unmet dependency + This function upserts an entity into Port. + + Usage: + ```python + upsertedEntity = await self.context.port_client.upsert_entity( + entity, + event.port_app_config.get_port_request_options(), + user_agent_type, + should_raise=False, + ) + ``` + :param entity: An Entity to be upserted + :param request_options: A dictionary specifying how to upsert the entity + :param user_agent_type: a UserAgentType specifying who is preforming the action + :param should_raise: A boolean specifying whether the error should be raised or handled silently + :return: [Entity] if the upsert occured successfully + :return: [None] will be returned if entity is using search identifier + :return: [False] will be returned if upsert failed because of unmet dependency """ validation_only = request_options["validation_only"] async with self.semaphore: diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 291c3f1099..7d24f17597 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -105,18 +105,7 @@ async def upsert( should_raise=False, ) else: - entities_with_search_identifier: list[Entity] = [] - entities_without_search_identifier: list[Entity] = [] for entity in entities: - if entity.is_using_search_identifier: - entities_with_search_identifier.append(entity) - else: - entities_without_search_identifier.append(entity) - - ordered_created_entities = reversed( - entities_with_search_identifier + entities_without_search_identifier - ) - for entity in ordered_created_entities: upsertedEntity = await self.context.port_client.upsert_entity( entity, event.port_app_config.get_port_request_options(), diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 3ce1bb19d2..da9a3ec1d5 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -400,7 +400,7 @@ async def update_raw_diff( async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: if event.entity_topological_sorter.is_to_execute(): - logger.info("Executings topological sort of entities failed to upsert") + logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert") else: logger.info("No failed entities on upsert") return diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py index d8a65e0354..3320650584 100644 --- a/port_ocean/core/utils/entity_topological_sorter.py +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -25,7 +25,10 @@ def register_entity( ) self.entities.append(entity) - def is_to_execute(self): + def is_to_execute(self) -> int: + return bool(self.get_entities_count()) + + def get_entities_count(self) -> int: return len(self.entities) def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]: @@ -34,17 +37,11 @@ def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]: yield entity return - entity_map: dict[str, Entity] = { - f"{entity.identifier}-{entity.blueprint}": entity - for entity in self.entities - } sorted_and_mapped = EntityTopologicalSorter.order_by_entities_dependencies( self.entities ) - for obj in sorted_and_mapped: - entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") - if entity is not None: - yield entity + for entity in sorted_and_mapped: + yield entity @staticmethod def node(entity: Entity) -> Node: diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index f4dfbe2449..32ef6b2ede 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -389,7 +389,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: assert "-".join( [call[1].get("json").get("identifier") for call in first] - ) == "-".join(reversed([entity.identifier for entity in entities])) + ) == "-".join([entity.identifier for entity in entities]) assert "-".join( [call[1].get("json").get("identifier") for call in second] ) in ( diff --git a/port_ocean/tests/core/test_utils.py b/port_ocean/tests/core/test_utils.py index c291778c0c..021a820671 100644 --- a/port_ocean/tests/core/test_utils.py +++ b/port_ocean/tests/core/test_utils.py @@ -2,7 +2,7 @@ import pytest -from port_ocean.core.utils import validate_integration_runtime +from port_ocean.core.utils.utils import validate_integration_runtime from port_ocean.clients.port.client import PortClient from port_ocean.core.models import Runtime from port_ocean.tests.helpers.port_client import get_port_client_for_integration From 79054b28cb450bb82225daae8dc9cb5263156108 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 23 Dec 2024 16:52:57 +0200 Subject: [PATCH 06/11] [Core] Add retry for failed upserts and handle circular dependencies 1. Fix lint 2. change order of assert in test --- .../sonarqube/examples/project.entity.json | 22 +++++++++---------- .../core/handlers/mixins/test_sync_raw.py | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/integrations/sonarqube/examples/project.entity.json b/integrations/sonarqube/examples/project.entity.json index 52ae086867..cc24ab6056 100644 --- a/integrations/sonarqube/examples/project.entity.json +++ b/integrations/sonarqube/examples/project.entity.json @@ -3,17 +3,17 @@ "title": "Port API", "team": [], "properties": { - "organization": "port-labs", - "link": "https://sonarcloud.io/project/overview?id=port-labs_Port_port-api", - "qualityGateStatus": "ERROR", - "numberOfBugs": 20, - "numberOfCodeSmells": 262, - "numberOfVulnerabilities": 0, - "numberOfHotSpots": 3, - "numberOfDuplications": 18, - "coverage": 0, - "mainBranch": "main" + "organization": "port-labs", + "link": "https://sonarcloud.io/project/overview?id=port-labs_Port_port-api", + "qualityGateStatus": "ERROR", + "numberOfBugs": 20, + "numberOfCodeSmells": 262, + "numberOfVulnerabilities": 0, + "numberOfHotSpots": 3, + "numberOfDuplications": 18, + "coverage": 0, + "mainBranch": "main" }, "relations": {}, "icon": "sonarqube" - } +} diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 32ef6b2ede..7101e18a2d 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -375,10 +375,10 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: trigger_type="machine", user_agent_type=UserAgentType.exporter ) + assert event.entity_topological_sorter.register_entity.call_count == 5 assert ( len(event.entity_topological_sorter.entities) == 5 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.register_entity.call_count == 5 assert event.entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 0 assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore From 5099a80897c8cb2171c519fea903c36442ee76e9 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 23 Dec 2024 21:14:50 +0200 Subject: [PATCH 07/11] [Core] Add retry for failed upserts and handle circular dependencies 1. Update `entity_topological_sorter` to be created seperatly for each class instance. 2. Remove data class from EntityTopologicalSorter. 3. Update `entity_topological_sorter` to `_entity_topological_sorter`. --- port_ocean/context/event.py | 12 +++++- .../entities_state_applier/port/applier.py | 2 +- .../core/integrations/mixins/sync_raw.py | 8 ++-- .../core/utils/entity_topological_sorter.py | 5 +-- .../core/handlers/mixins/test_sync_raw.py | 40 +++++++++---------- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index fcbe7e6d0f..f20b22a216 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -52,8 +52,9 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - - entity_topological_sorter = EntityTopologicalSorter() + _entity_topological_sorter: EntityTopologicalSorter = field( + default_factory=EntityTopologicalSorter + ) def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) @@ -96,6 +97,7 @@ def parent_id(self) -> Optional[str]: @property def port_app_config(self) -> "PortAppConfig": + print("self._port_app_config", self._port_app_config) if self._port_app_config is None: raise ValueError("Port app config is not set") return self._port_app_config @@ -133,6 +135,11 @@ async def event_context( ) -> AsyncIterator[EventContext]: parent = parent_override or _event_context_stack.top parent_attributes = parent.attributes if parent else {} + entity_topological_sorter = ( + parent._entity_topological_sorter + if parent and parent._entity_topological_sorter + else EntityTopologicalSorter() + ) attributes = {**parent_attributes, **(attributes or {})} new_event = EventContext( @@ -142,6 +149,7 @@ async def event_context( _parent_event=parent, # inherit port app config from parent event, so it can be used in nested events _port_app_config=parent.port_app_config if parent else None, + _entity_topological_sorter=entity_topological_sorter, ) _event_context_stack.push(new_event) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 7d24f17597..82722ff86c 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -116,7 +116,7 @@ async def upsert( modified_entities.append(upsertedEntity) # condition to false to differentiate from `result_entity.is_using_search_identifier` if upsertedEntity is False: - event.entity_topological_sorter.register_entity(entity) + event._entity_topological_sorter.register_entity(entity) return modified_entities async def delete( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index da9a3ec1d5..f22f3bd23a 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -399,19 +399,19 @@ async def update_raw_diff( ) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: - if event.entity_topological_sorter.is_to_execute(): - logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert") + if event._entity_topological_sorter.is_to_execute(): + logger.info(f"Executings topological sort of {event._entity_topological_sorter.get_entities_count()} entities failed to upsert") else: logger.info("No failed entities on upsert") return - for entity in event.entity_topological_sorter.get_entities(): + for entity in event._entity_topological_sorter.get_entities(): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) except OceanAbortException as ocean_abort: logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered") if isinstance(ocean_abort.__cause__,CycleError): - for entity in event.entity_topological_sorter.get_entities(False): + for entity in event._entity_topological_sorter.get_entities(False): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) async def sync_raw_all( self, diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py index 3320650584..2aa744ea98 100644 --- a/port_ocean/core/utils/entity_topological_sorter.py +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -1,7 +1,6 @@ from typing import Any, Generator from port_ocean.core.models import Entity -from dataclasses import dataclass, field from loguru import logger from graphlib import TopologicalSorter, CycleError @@ -12,9 +11,9 @@ Node = tuple[str, str] -@dataclass class EntityTopologicalSorter: - entities: list[Entity] = field(default_factory=list) + def __init__(self) -> None: + self.entities: list[Entity] = [] def register_entity( self, diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 7101e18a2d..2094cdc307 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -206,8 +206,8 @@ async def test_sync_raw_mixin_self_dependency( ) ) event.port_app_config = app_config - event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore - event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore + event._entity_topological_sorter.register_entity = MagicMock(side_effect=event._entity_topological_sorter.register_entity) # type: ignore + event._entity_topological_sorter.get_entities = MagicMock(side_effect=event._entity_topological_sorter.get_entities) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -223,10 +223,10 @@ async def test_sync_raw_mixin_self_dependency( ) assert ( - len(event.entity_topological_sorter.entities) == 1 + len(event._entity_topological_sorter.entities) == 1 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.register_entity.call_count == 1 - assert event.entity_topological_sorter.get_entities.call_count == 1 + assert event._entity_topological_sorter.register_entity.call_count == 1 + assert event._entity_topological_sorter.get_entities.call_count == 1 assert mock_order_by_entities_dependencies.call_count == 1 assert [ @@ -261,16 +261,16 @@ async def test_sync_raw_mixin_circular_dependency( ) ) event.port_app_config = app_config - org = event.entity_topological_sorter.register_entity + org = event._entity_topological_sorter.register_entity def mock_register_entity(*args: Any, **kwargs: Any) -> Any: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_get_entities = event.entity_topological_sorter.get_entities + org_get_entities = event._entity_topological_sorter.get_entities def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: try: @@ -279,7 +279,7 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: raiesed_error_handle_failed.append(e) raise e - event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore + event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -295,13 +295,13 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event.entity_topological_sorter.entities) == 2 + len(event._entity_topological_sorter.entities) == 2 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.register_entity.call_count == 2 - assert event.entity_topological_sorter.get_entities.call_count == 2 + assert event._entity_topological_sorter.register_entity.call_count == 2 + assert event._entity_topological_sorter.get_entities.call_count == 2 assert [ call[0] - for call in event.entity_topological_sorter.get_entities.call_args_list + for call in event._entity_topological_sorter.get_entities.call_args_list ] == [(), (False,)] assert len(raiesed_error_handle_failed) == 1 assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) @@ -342,16 +342,16 @@ async def test_sync_raw_mixin_dependency( ) ) event.port_app_config = app_config - org = event.entity_topological_sorter.register_entity + org = event._entity_topological_sorter.register_entity def mock_register_entity(*args: Any, **kwargs: Any) -> None: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_get_entities = event.entity_topological_sorter.get_entities + org_event_get_entities = event._entity_topological_sorter.get_entities def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: try: @@ -360,7 +360,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: raiesed_error_handle_failed.append(e) raise e - event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore + event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -375,11 +375,11 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: trigger_type="machine", user_agent_type=UserAgentType.exporter ) - assert event.entity_topological_sorter.register_entity.call_count == 5 + assert event._entity_topological_sorter.register_entity.call_count == 5 assert ( - len(event.entity_topological_sorter.entities) == 5 + len(event._entity_topological_sorter.entities) == 5 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.get_entities.call_count == 1 + assert event._entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 0 assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore assert mock_order_by_entities_dependencies.call_count == 1 From 7b9d2fd7da8493fbc0fdec12e961739a4c92f58f Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Tue, 24 Dec 2024 10:25:07 +0200 Subject: [PATCH 08/11] [Core] Add retry for failed upserts and handle circular dependencies Rename _entity_topological_sorter to entity_topological_sorter --- port_ocean/context/event.py | 9 ++--- .../entities_state_applier/port/applier.py | 2 +- .../core/integrations/mixins/sync_raw.py | 8 ++-- .../core/handlers/mixins/test_sync_raw.py | 40 +++++++++---------- 4 files changed, 29 insertions(+), 30 deletions(-) diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index f20b22a216..5f8b2b37d5 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -52,7 +52,7 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - _entity_topological_sorter: EntityTopologicalSorter = field( + entity_topological_sorter: EntityTopologicalSorter = field( default_factory=EntityTopologicalSorter ) @@ -97,7 +97,6 @@ def parent_id(self) -> Optional[str]: @property def port_app_config(self) -> "PortAppConfig": - print("self._port_app_config", self._port_app_config) if self._port_app_config is None: raise ValueError("Port app config is not set") return self._port_app_config @@ -136,8 +135,8 @@ async def event_context( parent = parent_override or _event_context_stack.top parent_attributes = parent.attributes if parent else {} entity_topological_sorter = ( - parent._entity_topological_sorter - if parent and parent._entity_topological_sorter + parent.entity_topological_sorter + if parent and parent.entity_topological_sorter else EntityTopologicalSorter() ) @@ -149,7 +148,7 @@ async def event_context( _parent_event=parent, # inherit port app config from parent event, so it can be used in nested events _port_app_config=parent.port_app_config if parent else None, - _entity_topological_sorter=entity_topological_sorter, + entity_topological_sorter=entity_topological_sorter, ) _event_context_stack.push(new_event) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 82722ff86c..7d24f17597 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -116,7 +116,7 @@ async def upsert( modified_entities.append(upsertedEntity) # condition to false to differentiate from `result_entity.is_using_search_identifier` if upsertedEntity is False: - event._entity_topological_sorter.register_entity(entity) + event.entity_topological_sorter.register_entity(entity) return modified_entities async def delete( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index f22f3bd23a..da9a3ec1d5 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -399,19 +399,19 @@ async def update_raw_diff( ) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: - if event._entity_topological_sorter.is_to_execute(): - logger.info(f"Executings topological sort of {event._entity_topological_sorter.get_entities_count()} entities failed to upsert") + if event.entity_topological_sorter.is_to_execute(): + logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert") else: logger.info("No failed entities on upsert") return - for entity in event._entity_topological_sorter.get_entities(): + for entity in event.entity_topological_sorter.get_entities(): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) except OceanAbortException as ocean_abort: logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered") if isinstance(ocean_abort.__cause__,CycleError): - for entity in event._entity_topological_sorter.get_entities(False): + for entity in event.entity_topological_sorter.get_entities(False): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) async def sync_raw_all( self, diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 2094cdc307..7101e18a2d 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -206,8 +206,8 @@ async def test_sync_raw_mixin_self_dependency( ) ) event.port_app_config = app_config - event._entity_topological_sorter.register_entity = MagicMock(side_effect=event._entity_topological_sorter.register_entity) # type: ignore - event._entity_topological_sorter.get_entities = MagicMock(side_effect=event._entity_topological_sorter.get_entities) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -223,10 +223,10 @@ async def test_sync_raw_mixin_self_dependency( ) assert ( - len(event._entity_topological_sorter.entities) == 1 + len(event.entity_topological_sorter.entities) == 1 ), "Expected one failed entity callback due to retry logic" - assert event._entity_topological_sorter.register_entity.call_count == 1 - assert event._entity_topological_sorter.get_entities.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 1 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert mock_order_by_entities_dependencies.call_count == 1 assert [ @@ -261,16 +261,16 @@ async def test_sync_raw_mixin_circular_dependency( ) ) event.port_app_config = app_config - org = event._entity_topological_sorter.register_entity + org = event.entity_topological_sorter.register_entity def mock_register_entity(*args: Any, **kwargs: Any) -> Any: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_get_entities = event._entity_topological_sorter.get_entities + org_get_entities = event.entity_topological_sorter.get_entities def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: try: @@ -279,7 +279,7 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: raiesed_error_handle_failed.append(e) raise e - event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -295,13 +295,13 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event._entity_topological_sorter.entities) == 2 + len(event.entity_topological_sorter.entities) == 2 ), "Expected one failed entity callback due to retry logic" - assert event._entity_topological_sorter.register_entity.call_count == 2 - assert event._entity_topological_sorter.get_entities.call_count == 2 + assert event.entity_topological_sorter.register_entity.call_count == 2 + assert event.entity_topological_sorter.get_entities.call_count == 2 assert [ call[0] - for call in event._entity_topological_sorter.get_entities.call_args_list + for call in event.entity_topological_sorter.get_entities.call_args_list ] == [(), (False,)] assert len(raiesed_error_handle_failed) == 1 assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) @@ -342,16 +342,16 @@ async def test_sync_raw_mixin_dependency( ) ) event.port_app_config = app_config - org = event._entity_topological_sorter.register_entity + org = event.entity_topological_sorter.register_entity def mock_register_entity(*args: Any, **kwargs: Any) -> None: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_get_entities = event._entity_topological_sorter.get_entities + org_event_get_entities = event.entity_topological_sorter.get_entities def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: try: @@ -360,7 +360,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: raiesed_error_handle_failed.append(e) raise e - event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -375,11 +375,11 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: trigger_type="machine", user_agent_type=UserAgentType.exporter ) - assert event._entity_topological_sorter.register_entity.call_count == 5 + assert event.entity_topological_sorter.register_entity.call_count == 5 assert ( - len(event._entity_topological_sorter.entities) == 5 + len(event.entity_topological_sorter.entities) == 5 ), "Expected one failed entity callback due to retry logic" - assert event._entity_topological_sorter.get_entities.call_count == 1 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 0 assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore assert mock_order_by_entities_dependencies.call_count == 1 From ac90c248f2eb75c5cff3b8cfa67928fdef2d96bd Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Tue, 24 Dec 2024 13:38:28 +0200 Subject: [PATCH 09/11] [Core] Add retry for failed upserts and handle circular dependencies 1. Modify logs and pass count as param. 2. Verify `create_missing_related_entities` is false before `sort_and_upsert_failed_entities` execution. --- port_ocean/core/integrations/mixins/sync_raw.py | 10 ++++------ port_ocean/core/utils/entity_topological_sorter.py | 3 ++- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index da9a3ec1d5..0186a40645 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -399,17 +399,15 @@ async def update_raw_diff( ) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: - if event.entity_topological_sorter.is_to_execute(): - logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert") - else: - logger.info("No failed entities on upsert") - return + if not event.entity_topological_sorter.is_to_execute(): + return None + logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert.",failed_toupsert_entities_count=event.entity_topological_sorter.get_entities_count()) for entity in event.entity_topological_sorter.get_entities(): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) except OceanAbortException as ocean_abort: - logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered") + logger.info(f"Failed topological sort of failed to upsert entites - trying to upsert unordered {event.entity_topological_sorter.get_entities_count()} entities.",failed_topological_sort_entities_count=event.entity_topological_sorter.get_entities_count() ) if isinstance(ocean_abort.__cause__,CycleError): for entity in event.entity_topological_sorter.get_entities(False): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py index 2aa744ea98..4856032991 100644 --- a/port_ocean/core/utils/entity_topological_sorter.py +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -1,4 +1,5 @@ from typing import Any, Generator +from port_ocean.context import event from port_ocean.core.models import Entity from loguru import logger @@ -25,7 +26,7 @@ def register_entity( self.entities.append(entity) def is_to_execute(self) -> int: - return bool(self.get_entities_count()) + return not event.event.port_app_config.create_missing_related_entities def get_entities_count(self) -> int: return len(self.entities) From 5c639ebdea2725bbe32f5a7dedf9860f8e130272 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Tue, 24 Dec 2024 15:42:48 +0200 Subject: [PATCH 10/11] [Core] Add retry for failed upserts and handle circular dependencies Update version --- CHANGELOG.md | 30 +++++++++++++++--------------- pyproject.toml | 2 +- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 7440939588..0bd446d4f8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.16.0 (2024-12-24) + + +### Improvements + +- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all + the failed entities through a topological sort and upsert them as well +- Test upsert with dependencies, with self circular dependency and external entity dependency. + +### Bug Fixes + +- When experiencing cyclic error on topological sort try unsorted upsert of the entities +- Fix topologicals sort tree creation so an entity cannot be its own dependency + + ## 0.15.3 (2024-12-22) ### Bug Fixes @@ -34,21 +49,6 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - Added `SaasOauth` runtime support -## 0.15.3 (2024-12-15) - - -### Improvements - -- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all - the failed entities through a topological sort and upsert them as well -- Test upsert with dependencies, with self circular dependency and external entity dependency. - -### Bug Fixes - -- When experiencing cyclic error on topological sort try unsorted upsert of the entities -- Fix topologicals sort tree creation so an entity cannot be its own dependency - - ## 0.14.7 (2024-12-09) diff --git a/pyproject.toml b/pyproject.toml index 2a57a18ccc..26005a99c5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.15.3" +version = "0.16.0" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io" From 538bd4e13c561dce285a61b1329e0dab3c7b82cd Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Tue, 24 Dec 2024 16:40:04 +0200 Subject: [PATCH 11/11] [Core] Add retry for failed upserts and handle circular dependencies update is_to_execute to should_execute --- port_ocean/core/integrations/mixins/sync_raw.py | 2 +- port_ocean/core/utils/entity_topological_sorter.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 0186a40645..ecd94794f2 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -399,7 +399,7 @@ async def update_raw_diff( ) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: - if not event.entity_topological_sorter.is_to_execute(): + if not event.entity_topological_sorter.should_execute(): return None logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert.",failed_toupsert_entities_count=event.entity_topological_sorter.get_entities_count()) diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py index 4856032991..7c8d828ea0 100644 --- a/port_ocean/core/utils/entity_topological_sorter.py +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -25,7 +25,7 @@ def register_entity( ) self.entities.append(entity) - def is_to_execute(self) -> int: + def should_execute(self) -> int: return not event.event.port_app_config.create_missing_related_entities def get_entities_count(self) -> int: