From 5099a80897c8cb2171c519fea903c36442ee76e9 Mon Sep 17 00:00:00 2001 From: Ivan Kalinovski Date: Mon, 23 Dec 2024 21:14:50 +0200 Subject: [PATCH] [Core] Add retry for failed upserts and handle circular dependencies 1. Update `entity_topological_sorter` to be created seperatly for each class instance. 2. Remove data class from EntityTopologicalSorter. 3. Update `entity_topological_sorter` to `_entity_topological_sorter`. --- port_ocean/context/event.py | 12 +++++- .../entities_state_applier/port/applier.py | 2 +- .../core/integrations/mixins/sync_raw.py | 8 ++-- .../core/utils/entity_topological_sorter.py | 5 +-- .../core/handlers/mixins/test_sync_raw.py | 40 +++++++++---------- 5 files changed, 37 insertions(+), 30 deletions(-) diff --git a/port_ocean/context/event.py b/port_ocean/context/event.py index fcbe7e6d0f..f20b22a216 100644 --- a/port_ocean/context/event.py +++ b/port_ocean/context/event.py @@ -52,8 +52,9 @@ class EventContext: _parent_event: Optional["EventContext"] = None _event_id: str = field(default_factory=lambda: str(uuid4())) _on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list) - - entity_topological_sorter = EntityTopologicalSorter() + _entity_topological_sorter: EntityTopologicalSorter = field( + default_factory=EntityTopologicalSorter + ) def on_abort(self, func: AbortCallbackFunction) -> None: self._on_abort_callbacks.append(func) @@ -96,6 +97,7 @@ def parent_id(self) -> Optional[str]: @property def port_app_config(self) -> "PortAppConfig": + print("self._port_app_config", self._port_app_config) if self._port_app_config is None: raise ValueError("Port app config is not set") return self._port_app_config @@ -133,6 +135,11 @@ async def event_context( ) -> AsyncIterator[EventContext]: parent = parent_override or _event_context_stack.top parent_attributes = parent.attributes if parent else {} + entity_topological_sorter = ( + parent._entity_topological_sorter + if parent and parent._entity_topological_sorter + else EntityTopologicalSorter() + ) attributes = {**parent_attributes, **(attributes or {})} new_event = EventContext( @@ -142,6 +149,7 @@ async def event_context( _parent_event=parent, # inherit port app config from parent event, so it can be used in nested events _port_app_config=parent.port_app_config if parent else None, + _entity_topological_sorter=entity_topological_sorter, ) _event_context_stack.push(new_event) diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 7d24f17597..82722ff86c 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -116,7 +116,7 @@ async def upsert( modified_entities.append(upsertedEntity) # condition to false to differentiate from `result_entity.is_using_search_identifier` if upsertedEntity is False: - event.entity_topological_sorter.register_entity(entity) + event._entity_topological_sorter.register_entity(entity) return modified_entities async def delete( diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index da9a3ec1d5..f22f3bd23a 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -399,19 +399,19 @@ async def update_raw_diff( ) async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None: try: - if event.entity_topological_sorter.is_to_execute(): - logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert") + if event._entity_topological_sorter.is_to_execute(): + logger.info(f"Executings topological sort of {event._entity_topological_sorter.get_entities_count()} entities failed to upsert") else: logger.info("No failed entities on upsert") return - for entity in event.entity_topological_sorter.get_entities(): + for entity in event._entity_topological_sorter.get_entities(): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) except OceanAbortException as ocean_abort: logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered") if isinstance(ocean_abort.__cause__,CycleError): - for entity in event.entity_topological_sorter.get_entities(False): + for entity in event._entity_topological_sorter.get_entities(False): await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False) async def sync_raw_all( self, diff --git a/port_ocean/core/utils/entity_topological_sorter.py b/port_ocean/core/utils/entity_topological_sorter.py index 3320650584..2aa744ea98 100644 --- a/port_ocean/core/utils/entity_topological_sorter.py +++ b/port_ocean/core/utils/entity_topological_sorter.py @@ -1,7 +1,6 @@ from typing import Any, Generator from port_ocean.core.models import Entity -from dataclasses import dataclass, field from loguru import logger from graphlib import TopologicalSorter, CycleError @@ -12,9 +11,9 @@ Node = tuple[str, str] -@dataclass class EntityTopologicalSorter: - entities: list[Entity] = field(default_factory=list) + def __init__(self) -> None: + self.entities: list[Entity] = [] def register_entity( self, diff --git a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py index 7101e18a2d..2094cdc307 100644 --- a/port_ocean/tests/core/handlers/mixins/test_sync_raw.py +++ b/port_ocean/tests/core/handlers/mixins/test_sync_raw.py @@ -206,8 +206,8 @@ async def test_sync_raw_mixin_self_dependency( ) ) event.port_app_config = app_config - event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore - event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore + event._entity_topological_sorter.register_entity = MagicMock(side_effect=event._entity_topological_sorter.register_entity) # type: ignore + event._entity_topological_sorter.get_entities = MagicMock(side_effect=event._entity_topological_sorter.get_entities) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -223,10 +223,10 @@ async def test_sync_raw_mixin_self_dependency( ) assert ( - len(event.entity_topological_sorter.entities) == 1 + len(event._entity_topological_sorter.entities) == 1 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.register_entity.call_count == 1 - assert event.entity_topological_sorter.get_entities.call_count == 1 + assert event._entity_topological_sorter.register_entity.call_count == 1 + assert event._entity_topological_sorter.get_entities.call_count == 1 assert mock_order_by_entities_dependencies.call_count == 1 assert [ @@ -261,16 +261,16 @@ async def test_sync_raw_mixin_circular_dependency( ) ) event.port_app_config = app_config - org = event.entity_topological_sorter.register_entity + org = event._entity_topological_sorter.register_entity def mock_register_entity(*args: Any, **kwargs: Any) -> Any: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_get_entities = event.entity_topological_sorter.get_entities + org_get_entities = event._entity_topological_sorter.get_entities def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: try: @@ -279,7 +279,7 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: raiesed_error_handle_failed.append(e) raise e - event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore + event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -295,13 +295,13 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any: ) assert ( - len(event.entity_topological_sorter.entities) == 2 + len(event._entity_topological_sorter.entities) == 2 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.register_entity.call_count == 2 - assert event.entity_topological_sorter.get_entities.call_count == 2 + assert event._entity_topological_sorter.register_entity.call_count == 2 + assert event._entity_topological_sorter.get_entities.call_count == 2 assert [ call[0] - for call in event.entity_topological_sorter.get_entities.call_args_list + for call in event._entity_topological_sorter.get_entities.call_args_list ] == [(), (False,)] assert len(raiesed_error_handle_failed) == 1 assert isinstance(raiesed_error_handle_failed[0], OceanAbortException) @@ -342,16 +342,16 @@ async def test_sync_raw_mixin_dependency( ) ) event.port_app_config = app_config - org = event.entity_topological_sorter.register_entity + org = event._entity_topological_sorter.register_entity def mock_register_entity(*args: Any, **kwargs: Any) -> None: entity = args[0] entity.properties["mock_is_to_fail"] = False return org(*args, **kwargs) - event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore + event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore raiesed_error_handle_failed = [] - org_event_get_entities = event.entity_topological_sorter.get_entities + org_event_get_entities = event._entity_topological_sorter.get_entities def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: try: @@ -360,7 +360,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: raiesed_error_handle_failed.append(e) raise e - event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore + event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore with patch( "port_ocean.core.integrations.mixins.sync_raw.event_context", @@ -375,11 +375,11 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any: trigger_type="machine", user_agent_type=UserAgentType.exporter ) - assert event.entity_topological_sorter.register_entity.call_count == 5 + assert event._entity_topological_sorter.register_entity.call_count == 5 assert ( - len(event.entity_topological_sorter.entities) == 5 + len(event._entity_topological_sorter.entities) == 5 ), "Expected one failed entity callback due to retry logic" - assert event.entity_topological_sorter.get_entities.call_count == 1 + assert event._entity_topological_sorter.get_entities.call_count == 1 assert len(raiesed_error_handle_failed) == 0 assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore assert mock_order_by_entities_dependencies.call_count == 1