Skip to content

Commit

Permalink
[Core] Add retry for failed upserts and handle circular dependencies
Browse files Browse the repository at this point in the history
1. Update `entity_topological_sorter` to be created seperatly for each class instance.
2. Remove data class from EntityTopologicalSorter.
3. Update `entity_topological_sorter` to `_entity_topological_sorter`.
  • Loading branch information
Ivan Kalinovski authored and Ivan Kalinovski committed Dec 23, 2024
1 parent 79054b2 commit 5099a80
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 30 deletions.
12 changes: 10 additions & 2 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,9 @@ class EventContext:
_parent_event: Optional["EventContext"] = None
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)

entity_topological_sorter = EntityTopologicalSorter()
_entity_topological_sorter: EntityTopologicalSorter = field(
default_factory=EntityTopologicalSorter
)

def on_abort(self, func: AbortCallbackFunction) -> None:
self._on_abort_callbacks.append(func)
Expand Down Expand Up @@ -96,6 +97,7 @@ def parent_id(self) -> Optional[str]:

@property
def port_app_config(self) -> "PortAppConfig":
print("self._port_app_config", self._port_app_config)
if self._port_app_config is None:
raise ValueError("Port app config is not set")
return self._port_app_config
Expand Down Expand Up @@ -133,6 +135,11 @@ async def event_context(
) -> AsyncIterator[EventContext]:
parent = parent_override or _event_context_stack.top
parent_attributes = parent.attributes if parent else {}
entity_topological_sorter = (
parent._entity_topological_sorter
if parent and parent._entity_topological_sorter
else EntityTopologicalSorter()
)

attributes = {**parent_attributes, **(attributes or {})}
new_event = EventContext(
Expand All @@ -142,6 +149,7 @@ async def event_context(
_parent_event=parent,
# inherit port app config from parent event, so it can be used in nested events
_port_app_config=parent.port_app_config if parent else None,
_entity_topological_sorter=entity_topological_sorter,
)
_event_context_stack.push(new_event)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ async def upsert(
modified_entities.append(upsertedEntity)
# condition to false to differentiate from `result_entity.is_using_search_identifier`
if upsertedEntity is False:
event.entity_topological_sorter.register_entity(entity)
event._entity_topological_sorter.register_entity(entity)
return modified_entities

async def delete(
Expand Down
8 changes: 4 additions & 4 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -399,19 +399,19 @@ async def update_raw_diff(
)
async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None:
try:
if event.entity_topological_sorter.is_to_execute():
logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert")
if event._entity_topological_sorter.is_to_execute():
logger.info(f"Executings topological sort of {event._entity_topological_sorter.get_entities_count()} entities failed to upsert")
else:
logger.info("No failed entities on upsert")
return

for entity in event.entity_topological_sorter.get_entities():
for entity in event._entity_topological_sorter.get_entities():
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except OceanAbortException as ocean_abort:
logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered")
if isinstance(ocean_abort.__cause__,CycleError):
for entity in event.entity_topological_sorter.get_entities(False):
for entity in event._entity_topological_sorter.get_entities(False):
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)
async def sync_raw_all(
self,
Expand Down
5 changes: 2 additions & 3 deletions port_ocean/core/utils/entity_topological_sorter.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any, Generator
from port_ocean.core.models import Entity

from dataclasses import dataclass, field
from loguru import logger

from graphlib import TopologicalSorter, CycleError
Expand All @@ -12,9 +11,9 @@
Node = tuple[str, str]


@dataclass
class EntityTopologicalSorter:
entities: list[Entity] = field(default_factory=list)
def __init__(self) -> None:
self.entities: list[Entity] = []

def register_entity(
self,
Expand Down
40 changes: 20 additions & 20 deletions port_ocean/tests/core/handlers/mixins/test_sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ async def test_sync_raw_mixin_self_dependency(
)
)
event.port_app_config = app_config
event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore
event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore
event._entity_topological_sorter.register_entity = MagicMock(side_effect=event._entity_topological_sorter.register_entity) # type: ignore
event._entity_topological_sorter.get_entities = MagicMock(side_effect=event._entity_topological_sorter.get_entities) # type: ignore

with patch(
"port_ocean.core.integrations.mixins.sync_raw.event_context",
Expand All @@ -223,10 +223,10 @@ async def test_sync_raw_mixin_self_dependency(
)

assert (
len(event.entity_topological_sorter.entities) == 1
len(event._entity_topological_sorter.entities) == 1
), "Expected one failed entity callback due to retry logic"
assert event.entity_topological_sorter.register_entity.call_count == 1
assert event.entity_topological_sorter.get_entities.call_count == 1
assert event._entity_topological_sorter.register_entity.call_count == 1
assert event._entity_topological_sorter.get_entities.call_count == 1

assert mock_order_by_entities_dependencies.call_count == 1
assert [
Expand Down Expand Up @@ -261,16 +261,16 @@ async def test_sync_raw_mixin_circular_dependency(
)
)
event.port_app_config = app_config
org = event.entity_topological_sorter.register_entity
org = event._entity_topological_sorter.register_entity

def mock_register_entity(*args: Any, **kwargs: Any) -> Any:
entity = args[0]
entity.properties["mock_is_to_fail"] = False
return org(*args, **kwargs)

event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore
event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore
raiesed_error_handle_failed = []
org_get_entities = event.entity_topological_sorter.get_entities
org_get_entities = event._entity_topological_sorter.get_entities

def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
try:
Expand All @@ -279,7 +279,7 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
raiesed_error_handle_failed.append(e)
raise e

event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore
event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore

with patch(
"port_ocean.core.integrations.mixins.sync_raw.event_context",
Expand All @@ -295,13 +295,13 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
)

assert (
len(event.entity_topological_sorter.entities) == 2
len(event._entity_topological_sorter.entities) == 2
), "Expected one failed entity callback due to retry logic"
assert event.entity_topological_sorter.register_entity.call_count == 2
assert event.entity_topological_sorter.get_entities.call_count == 2
assert event._entity_topological_sorter.register_entity.call_count == 2
assert event._entity_topological_sorter.get_entities.call_count == 2
assert [
call[0]
for call in event.entity_topological_sorter.get_entities.call_args_list
for call in event._entity_topological_sorter.get_entities.call_args_list
] == [(), (False,)]
assert len(raiesed_error_handle_failed) == 1
assert isinstance(raiesed_error_handle_failed[0], OceanAbortException)
Expand Down Expand Up @@ -342,16 +342,16 @@ async def test_sync_raw_mixin_dependency(
)
)
event.port_app_config = app_config
org = event.entity_topological_sorter.register_entity
org = event._entity_topological_sorter.register_entity

def mock_register_entity(*args: Any, **kwargs: Any) -> None:
entity = args[0]
entity.properties["mock_is_to_fail"] = False
return org(*args, **kwargs)

event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore
event._entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore
raiesed_error_handle_failed = []
org_event_get_entities = event.entity_topological_sorter.get_entities
org_event_get_entities = event._entity_topological_sorter.get_entities

def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any:
try:
Expand All @@ -360,7 +360,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any:
raiesed_error_handle_failed.append(e)
raise e

event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore
event._entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore

with patch(
"port_ocean.core.integrations.mixins.sync_raw.event_context",
Expand All @@ -375,11 +375,11 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any:
trigger_type="machine", user_agent_type=UserAgentType.exporter
)

assert event.entity_topological_sorter.register_entity.call_count == 5
assert event._entity_topological_sorter.register_entity.call_count == 5
assert (
len(event.entity_topological_sorter.entities) == 5
len(event._entity_topological_sorter.entities) == 5
), "Expected one failed entity callback due to retry logic"
assert event.entity_topological_sorter.get_entities.call_count == 1
assert event._entity_topological_sorter.get_entities.call_count == 1
assert len(raiesed_error_handle_failed) == 0
assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore
assert mock_order_by_entities_dependencies.call_count == 1
Expand Down

0 comments on commit 5099a80

Please sign in to comment.