diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index 60cc2e26ca..c22601a95e 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -11,7 +11,8 @@ handle_status_code, PORT_HTTP_MAX_CONNECTIONS_LIMIT, ) -from port_ocean.core.models import Entity +from port_ocean.core.models import Entity, PortApiStatus +from starlette import status class EntityClientMixin: @@ -50,7 +51,6 @@ async def upsert_entity( }, extensions={"retryable": True}, ) - if response.is_error: logger.error( f"Error {'Validating' if validation_only else 'Upserting'} " @@ -58,10 +58,11 @@ async def upsert_entity( f"blueprint: {entity.blueprint}" ) result = response.json() + if ( - response.status_code == 404 + response.status_code == status.HTTP_404_NOT_FOUND and not result.get("ok") - and result.get("error") == "not_found" + and result.get("error") == PortApiStatus.NOT_FOUND.value ): return False handle_status_code(response, should_raise) diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index 94cc39e088..ecbdd8b6c3 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -14,7 +14,7 @@ from uuid import uuid4 from loguru import logger -from port_ocean.utils.failed_entity_handler import FailedEntityHandler +from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter from pydispatch import dispatcher # type: ignore from werkzeug.local import LocalStack, LocalProxy @@ -53,7 +53,7 @@ class EventContext: _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - failed_entity_handler = FailedEntityHandler() + entity_topological_sorter = EntityTopologicalSorter() def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index d7dd022e5b..aa0c4324c2 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -127,12 +127,7 @@ async def upsert( if upsertedEntity: modified_entities.append(upsertedEntity) if upsertedEntity is False: - event.failed_entity_handler.register_failed_upsert_call_arguments( - entity, - event.port_app_config.get_port_request_options(), - user_agent_type, - self.context.port_client.upsert_entity, - ) + event.entity_topological_sorter.register_entity(entity) return modified_entities async def delete( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index f9819f6cf9..8d8158a4d9 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -1,4 +1,5 @@ import asyncio +from graphlib import CycleError import inspect import typing from typing import Callable, Awaitable, Any @@ -457,9 +458,14 @@ async def sync_raw_all( creation_results.append(await task) try: - await event.failed_entity_handler.handle_failed() - except: - await event.failed_entity_handler.handle_failed_no_sort() + for entity in event.entity_topological_sorter.get_entities(): + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + + except OceanAbortException as ocean_abort: + if isinstance(ocean_abort.__cause__,CycleError): + for entity in event.entity_topological_sorter.entities: + await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) + except asyncio.CancelledError as e: logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state") raise diff --git a/port_ocean/core/models.py b/port_ocean/core/models.py index fb588becda..a22cc208c0 100644 --- a/port_ocean/core/models.py +++ b/port_ocean/core/models.py @@ -11,6 +11,10 @@ class Runtime(Enum): OnPrem = "OnPrem" +class PortApiStatus(Enum): + NOT_FOUND = "not_found" + + class Entity(BaseModel): identifier: Any blueprint: Any diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py similarity index 76% rename from port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py rename to port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 3b15308fb8..353aaa4de9 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw_v2.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -173,6 +173,15 @@ async def no_op_event_context( yield existing_event +def create_entity( + id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool +) -> Entity: + entity = Entity(identifier=id, blueprint=blueprint) + entity.relations = relation + entity.properties = {"mock_is_to_fail": is_to_fail} + return entity + + @pytest.mark.asyncio async def test_sync_raw_mixin_self_dependency( mock_sync_raw_mixin: SyncRawMixin, @@ -199,16 +208,15 @@ async def test_sync_raw_mixin_self_dependency( ) ) event.port_app_config = app_config - event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=event.failed_entity_handler.register_failed_upsert_call_arguments) # type: ignore - event.failed_entity_handler.handle_failed = MagicMock(side_effect=event.failed_entity_handler.handle_failed) # type: ignore - event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -217,13 +225,10 @@ async def test_sync_raw_mixin_self_dependency( ) assert ( - len(event.failed_entity_handler._failed_entity_callback_list) == 1 + len(event.entity_topological_sorter.entities) == 1 ), "Expected one failed entity callback due to retry logic" - assert ( - event.failed_entity_handler.register_failed_upsert_call_arguments.call_count - == 1 - ) - assert event.failed_entity_handler.handle_failed.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 1 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert mock_order_by_entities_dependencies.call_count == 1 assert [ @@ -231,8 +236,6 @@ async def test_sync_raw_mixin_self_dependency( for call in mock_order_by_entities_dependencies.call_args_list ] == [entity for entity in entities if entity.identifier == "entity_1"] - assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0 - @pytest.mark.asyncio async def test_sync_raw_mixin_circular_dependency( @@ -260,35 +263,32 @@ async def test_sync_raw_mixin_circular_dependency( ) ) event.port_app_config = app_config - org = event.failed_entity_handler.register_failed_upsert_call_arguments + org = event.entity_topological_sorter.register_entity - def mock_register_failed_upsert_call_arguments( - *args: Any, **kwargs: Any - ) -> Any: + def mock_register_entity(*args: Any, **kwargs: Any) -> Any: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_handle_failed = event.failed_entity_handler.handle_failed + org_get_entities = event.entity_topological_sorter.get_entities - async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: try: - return await org_event_handle_failed(*args, **kwargs) + return list(org_get_entities(*args, **kwargs)) except Exception as e: raiesed_error_handle_failed.append(e) raise e - event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore - event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -297,17 +297,13 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event.failed_entity_handler._failed_entity_callback_list) == 2 + len(event.entity_topological_sorter.entities) == 2 ), "Expected one failed entity callback due to retry logic" - assert ( - event.failed_entity_handler.register_failed_upsert_call_arguments.call_count - == 2 - ) - assert event.failed_entity_handler.handle_failed.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 2 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 1 assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError) - assert event.failed_entity_handler.handle_failed_no_sort.call_count == 1 assert ( len(mock_ocean.port_client.client.post.call_args_list) # type: ignore / len(entities) @@ -315,15 +311,6 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) -def create_entity( - id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool -) -> Entity: - entity = Entity(identifier=id, blueprint=blueprint) - entity.relations = relation - entity.properties = {"mock_is_to_fail": is_to_fail} - return entity - - @pytest.mark.asyncio async def test_sync_raw_mixin_dependency( mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean @@ -353,35 +340,32 @@ async def test_sync_raw_mixin_dependency( ) ) event.port_app_config = app_config - org = event.failed_entity_handler.register_failed_upsert_call_arguments + org = event.entity_topological_sorter.register_entity - def mock_register_failed_upsert_call_arguments( - *args: Any, **kwargs: Any - ) -> None: + def mock_register_entity(*args: Any, **kwargs: Any) -> None: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore + event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_handle_failed = event.failed_entity_handler.handle_failed + org_event_get_entities = event.entity_topological_sorter.get_entities - async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: + def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: try: - return await org_event_handle_failed(*args, **kwargs) + return org_event_get_entities(*args, **kwargs) except Exception as e: raiesed_error_handle_failed.append(e) raise e - event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore - event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore + event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", lambda *args, **kwargs: no_op_event_context(event), ): with patch( - "port_ocean.utils.failed_entity_handler.order_by_entities_dependencies", + "port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies", mock_order_by_entities_dependencies, ): @@ -390,13 +374,10 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event.failed_entity_handler._failed_entity_callback_list) == 5 + len(event.entity_topological_sorter.entities) == 5 ), "Expected one failed entity callback due to retry logic" - assert ( - event.failed_entity_handler.register_failed_upsert_call_arguments.call_count - == 5 - ) - assert event.failed_entity_handler.handle_failed.call_count == 1 + assert event.entity_topological_sorter.register_entity.call_count == 5 + assert event.entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 0 assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore assert mock_order_by_entities_dependencies.call_count == 1 @@ -406,14 +387,7 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: assert "-".join( [call[1].get("json").get("identifier") for call in first] - ) == "-".join( - reversed( - [ - entity.identifier - for entity in calc_result_mock.entity_selector_diff.passed - ] - ) - ) + ) == "-".join(reversed([entity.identifier for entity in entities])) assert "-".join( [call[1].get("json").get("identifier") for call in second] ) in ( @@ -422,4 +396,3 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: "entity_3-entity_1-entity_4-entity_2-entity_5", "entity_3-entity_1-entity_4-entity_5-entity_2", ) - assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0 diff --git a/port_ocean/tests/utils/failed_entity_handler.py b/port_ocean/tests/utils/test_entity_topological_sorter.py similarity index 53% rename from port_ocean/tests/utils/failed_entity_handler.py rename to port_ocean/tests/utils/test_entity_topological_sorter.py index dbd88b4d29..69be7a048c 100644 --- a/port_ocean/tests/utils/failed_entity_handler.py +++ b/port_ocean/tests/utils/test_entity_topological_sorter.py @@ -1,7 +1,5 @@ -from typing import Any from port_ocean.core.models import Entity -from port_ocean.utils.failed_entity_handler import FailedEntityHandler -import pytest +from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter from unittest.mock import MagicMock from port_ocean.exceptions.core import ( OceanAbortException, @@ -18,16 +16,7 @@ def create_entity( return entity -processed_order: list[str] = [] - - -async def mock_activate(entity: Any, _: Any, __: Any, **kwargs: Any) -> bool: - processed_order.append(f"{entity.identifier}-{entity.blueprint}") - return True - - -@pytest.mark.asyncio -async def test_handle_failed_with_dependencies() -> None: +def test_handle_failed_with_dependencies() -> None: # processed_order:list[str] = [] entity_a = create_entity( "entity_a", @@ -40,19 +29,16 @@ async def test_handle_failed_with_dependencies() -> None: "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} ) # Depends on entity_b - failed_entities_handler = FailedEntityHandler() + entity_topological_sort = EntityTopologicalSorter() # Register fails with unsorted order - failed_entities_handler.register_failed_upsert_call_arguments( - entity_c, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_a, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_b, MagicMock(), MagicMock(), mock_activate - ) - - await failed_entities_handler.handle_failed() + entity_topological_sort.register_entity(entity_c) + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + + processed_order = [ + f"{entity.identifier}-{entity.blueprint}" + for entity in list(entity_topological_sort.get_entities()) + ] assert processed_order == [ "entity_a-buleprint_a", "entity_b-buleprint_a", @@ -60,8 +46,7 @@ async def test_handle_failed_with_dependencies() -> None: ], f"Processed order: {processed_order}" -@pytest.mark.asyncio -async def test_handle_failed_with_self_dependencies() -> None: +def test_handle_failed_with_self_dependencies() -> None: entity_a = create_entity( "entity_a", "buleprint_a", {"dep_name_1": "entity_a"} ) # Self dependency @@ -72,20 +57,18 @@ async def test_handle_failed_with_self_dependencies() -> None: "entity_c", "buleprint_b", {"dep_name_2": "entity_b"} ) # Depends on entity_b - failed_entities_handler = FailedEntityHandler() + entity_topological_sort = EntityTopologicalSorter() # Register fails with unsorted order - failed_entities_handler.register_failed_upsert_call_arguments( - entity_c, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_a, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_b, MagicMock(), MagicMock(), mock_activate - ) - - await failed_entities_handler.handle_failed() + entity_topological_sort.register_entity(entity_c) + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + + processed_order = [ + f"{entity.identifier}-{entity.blueprint}" + for entity in list(entity_topological_sort.get_entities()) + ] + assert processed_order == [ "entity_a-buleprint_a", "entity_b-buleprint_a", @@ -93,8 +76,7 @@ async def test_handle_failed_with_self_dependencies() -> None: ], f"Processed order: {processed_order}" -@pytest.mark.asyncio -async def test_handle_failed_with_circular_dependencies() -> None: +def test_handle_failed_with_circular_dependencies() -> None: # processed_order:list[str] = [] entity_a = create_entity( "entity_a", "buleprint_a", {"dep_name_1": "entity_b"} @@ -103,15 +85,11 @@ async def test_handle_failed_with_circular_dependencies() -> None: "entity_b", "buleprint_a", {"dep_name_1": "entity_a"} ) # Depends on entity_a - failed_entities_handler = FailedEntityHandler() + entity_topological_sort = EntityTopologicalSorter() try: - failed_entities_handler.register_failed_upsert_call_arguments( - entity_a, MagicMock(), MagicMock(), mock_activate - ) - failed_entities_handler.register_failed_upsert_call_arguments( - entity_b, MagicMock(), MagicMock(), mock_activate - ) - await failed_entities_handler.handle_failed() + entity_topological_sort.register_entity(entity_a) + entity_topological_sort.register_entity(entity_b) + entity_topological_sort.get_entities() except OceanAbortException as e: assert isinstance(e, OceanAbortException) diff --git a/port_ocean/utils/entity_topological_sorter.py b/port_ocean/utils/entity_topological_sorter.py new file mode 100644 index 0000000000..b64e0f24d7 --- /dev/null +++ b/port_ocean/utils/entity_topological_sorter.py @@ -0,0 +1,37 @@ +from typing import Any, Generator +from port_ocean.core.models import Entity +from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( + order_by_entities_dependencies, +) + +from dataclasses import dataclass, field +from loguru import logger + + +@dataclass +class EntityTopologicalSorter: + entities: list[Entity] = field(default_factory=list) + + def register_entity( + self, + entity: Entity, + ) -> None: + logger.debug( + f"Will retry upserting entity - {entity.identifier} at the end of resync" + ) + self.entities.append(entity) + + @staticmethod + def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]: + return order_by_entities_dependencies(entities) + + def get_entities(self) -> Generator[Entity, Any, None]: + entity_map: dict[str, Entity] = { + f"{entity.identifier}-{entity.blueprint}": entity + for entity in self.entities + } + sorted_and_mapped = order_by_entities_dependencies(self.entities) + for obj in sorted_and_mapped: + entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}") + if entity is not None: + yield entity diff --git a/port_ocean/utils/failed_entity_handler.py b/port_ocean/utils/failed_entity_handler.py deleted file mode 100644 index 3f6c24a6b9..0000000000 --- a/port_ocean/utils/failed_entity_handler.py +++ /dev/null @@ -1,67 +0,0 @@ -from port_ocean.clients.port.types import UserAgentType -from port_ocean.core.models import Entity -from port_ocean.clients.port.types import RequestOptions -from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( - order_by_entities_dependencies, -) -from typing import ( - Any, - Callable, - Tuple, -) -from dataclasses import dataclass, field -from loguru import logger - - -@dataclass -class FailedEntityHandler: - _failed_entity_callback_list: list[ - Tuple[ - Entity, - Tuple[ - Tuple[Entity, RequestOptions, UserAgentType], - Callable[[Entity, RequestOptions, UserAgentType], Any], - ], - ] - ] = field(default_factory=list) - - def register_failed_upsert_call_arguments( - self, - entity: Entity, - get_port_request_options: RequestOptions, - user_agent_type: UserAgentType, - func: Callable[[Entity, RequestOptions, UserAgentType], Any], - ) -> None: - logger.debug( - f"Will retry upserting entity - {entity.identifier} at the end of resync" - ) - self._failed_entity_callback_list.append( - (entity, ((entity, get_port_request_options, user_agent_type), func)) - ) - - async def handle_failed(self) -> None: - entity_map: dict[ - str, - Tuple[ - Tuple[Entity, RequestOptions, UserAgentType], - Callable[[Entity, RequestOptions, UserAgentType], Any], - ], - ] = { - f"{obj.identifier}-{obj.blueprint}": call - for obj, call in self._failed_entity_callback_list - } - entity_list: list[Entity] = [ - obj for obj, call in self._failed_entity_callback_list - ] - - sorted_and_mapped = order_by_entities_dependencies(entity_list) - for obj in sorted_and_mapped: - call = entity_map.get(f"{obj.identifier}-{obj.blueprint}") - if call is not None: - args, func = call - await func(*args, **{"should_raise": False}) - - async def handle_failed_no_sort(self) -> None: - for obj, call in self._failed_entity_callback_list: - args, func = call - await func(*args, **{"should_raise": False})