Skip to content

Commit

Permalink
[Core] Add retry for failed upsertes and handle circular dependencies
Browse files Browse the repository at this point in the history
1. Register callbacks of failed entities.
2. When done with upserts, try topological sort on failed entities.
3. On fail of retry because of topological sort - try unsorted upsert.
4. Update topological's sort tree creation so an entity cannot be it's own dependency.
5. Test upsert with dependencies, with self circular dependency and external entity dependency.
  • Loading branch information
Ivan Kalinovski authored and Ivan Kalinovski committed Dec 15, 2024
1 parent 6502e75 commit 6ef411a
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 5 deletions.
11 changes: 9 additions & 2 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any
from typing import Any, Literal
from urllib.parse import quote_plus

import httpx
Expand Down Expand Up @@ -29,7 +29,7 @@ async def upsert_entity(
request_options: RequestOptions,
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> Entity | None:
) -> Entity | None | Literal[False]:
validation_only = request_options["validation_only"]
async with self.semaphore:
logger.debug(
Expand Down Expand Up @@ -57,6 +57,13 @@ async def upsert_entity(
f"entity: {entity.identifier} of "
f"blueprint: {entity.blueprint}"
)
result = response.json()
if (
response.status_code == 404
and result.get("ok") is False
and result.get("error") == "not_found"
):
return False
handle_status_code(response, should_raise)
result = response.json()

Expand Down
37 changes: 37 additions & 0 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
Callable,
Awaitable,
Union,
Tuple,
Coroutine,
)
from uuid import uuid4

Expand All @@ -23,6 +25,10 @@
ResourceContextNotFoundError,
)
from port_ocean.utils.misc import get_time
from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import (
order_by_entities_dependencies,
)
from port_ocean.core.models import Entity

if TYPE_CHECKING:
from port_ocean.core.handlers.port_app_config.models import (
Expand Down Expand Up @@ -50,6 +56,37 @@ class EventContext:
_parent_event: Optional["EventContext"] = None
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)
_failed_entity_callback_list: list[
Tuple[Entity, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]]]
] = field(default_factory=list)

def register_failed_upsert_call_arguments(
self,
entity: Entity,
func: Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]],
) -> None:
self._failed_entity_callback_list.append((entity, func))

async def handle_failed(self) -> None:
entity_map: dict[
str, Callable[[], Coroutine[Any, Any, Entity | Literal[False] | None]]
] = {
f"{obj.identifier}-{obj.blueprint}": func
for obj, func in self._failed_entity_callback_list
}
entity_list: list[Entity] = [
obj for obj, func in self._failed_entity_callback_list
]

sorted_and_mapped = order_by_entities_dependencies(entity_list)
for obj in sorted_and_mapped:
func = entity_map.get(f"{obj.identifier}-{obj.blueprint}")
if func is not None:
await func()

async def handle_failed_no_sort(self) -> None:
for obj, func in self._failed_entity_callback_list:
await func()

def on_abort(self, func: AbortCallbackFunction) -> None:
self._on_abort_callbacks.append(func)
Expand Down
13 changes: 11 additions & 2 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,7 @@ async def upsert(
entities_without_search_identifier.append(entity)

ordered_created_entities = reversed(
entities_with_search_identifier
+ order_by_entities_dependencies(entities_without_search_identifier)
entities_with_search_identifier + entities_without_search_identifier
)
for entity in ordered_created_entities:
upsertedEntity = await self.context.port_client.upsert_entity(
Expand All @@ -127,6 +126,16 @@ async def upsert(
)
if upsertedEntity:
modified_entities.append(upsertedEntity)
if upsertedEntity is False:
event.register_failed_upsert_call_arguments(
entity,
lambda: self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
),
)
return modified_entities

async def delete(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,11 @@ def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
]

for related_entity in related_entities:
nodes[node(entity)].add(node(related_entity))
if (
entity.blueprint is not related_entity.blueprint
or entity.identifier is not related_entity.identifier
):
nodes[node(entity)].add(node(related_entity))

sort_op = TopologicalSorter(nodes)
try:
Expand Down
4 changes: 4 additions & 0 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,10 @@ async def sync_raw_all(
event.on_abort(lambda: task.cancel())

creation_results.append(await task)
try:
await event.handle_failed()
except:
await event.handle_failed_no_sort()
except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
Expand Down
70 changes: 70 additions & 0 deletions port_ocean/tests/utils/integrations/mixins/test_sync_raw.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import pytest
from unittest.mock import MagicMock
from port_ocean.context.event import event, event_context, EventType
from port_ocean.exceptions.core import (
OceanAbortException,
)
from typing import Any

def create_entity(identifier:str,buleprint:str, dependencies:dict[str,str]=None):
entity = MagicMock()
entity.identifier = identifier
entity.blueprint = buleprint
entity.relations = dependencies or {}
return entity

async def mock_activate(processed_order:list[str],entity:MagicMock):
processed_order.append(f"{entity.identifier}-{entity.blueprint}")
return True

@pytest.mark.asyncio
async def test_handle_failed_with_dependencies():
processed_order:list[str] = []
entity_a = create_entity("entity_a", "buleprint_a",) # No dependencies
entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a
entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b


async with event_context(EventType.RESYNC, "manual"):
# Register fails with unsorted order
event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c))
event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a))
event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b))

await event.handle_failed()

assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}"

@pytest.mark.asyncio
async def test_handle_failed_with_self_dependencies():
processed_order:list[str] = []
entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_a"}) # Self dependency
entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a
entity_c = create_entity("entity_c", "buleprint_b", {"dep_name_2":"entity_b"}) # Depends on entity_b


async with event_context(EventType.RESYNC, "manual"):
# Register fails with unsorted order
event.register_failed_upsert_call_arguments(entity_c, lambda: mock_activate(processed_order,entity_c))
event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a))
event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b))

await event.handle_failed()

assert processed_order == ["entity_a-buleprint_a", "entity_b-buleprint_a", "entity_c-buleprint_b"], f"Processed order: {processed_order}"

@pytest.mark.asyncio
async def test_handle_failed_with_circular_dependencies():
processed_order:list[str] = []
entity_a = create_entity("entity_a", "buleprint_a",{"dep_name_1":"entity_b"}) # Self dependency
entity_b = create_entity("entity_b", "buleprint_a", {"dep_name_1":"entity_a"}) # Depends on entity_a

try:
async with event_context(EventType.RESYNC, "manual"):
# Register fails with unsorted order
event.register_failed_upsert_call_arguments(entity_a, lambda: mock_activate(processed_order,entity_a))
event.register_failed_upsert_call_arguments(entity_b, lambda: mock_activate(processed_order,entity_b))
await event.handle_failed()
except OceanAbortException as e:
assert isinstance(e,OceanAbortException)
assert e.args[0] == "Cannot order entities due to cyclic dependencies. \nIf you do want to have cyclic dependencies, please make sure to set the keys 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port."

0 comments on commit 6ef411a

Please sign in to comment.