Skip to content

Commit

Permalink
[Core] Add retry for failed upsertes and handle circular dependencies
Browse files Browse the repository at this point in the history
1. Extrac logic of topological sort into a class.
2. Modify tests.
  • Loading branch information
Ivan Kalinovski authored and Ivan Kalinovski committed Dec 22, 2024
1 parent b7b88ff commit 209e64b
Show file tree
Hide file tree
Showing 9 changed files with 123 additions and 196 deletions.
9 changes: 5 additions & 4 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
handle_status_code,
PORT_HTTP_MAX_CONNECTIONS_LIMIT,
)
from port_ocean.core.models import Entity
from port_ocean.core.models import Entity, PortApiStatus
from starlette import status


class EntityClientMixin:
Expand Down Expand Up @@ -50,18 +51,18 @@ async def upsert_entity(
},
extensions={"retryable": True},
)

if response.is_error:
logger.error(
f"Error {'Validating' if validation_only else 'Upserting'} "
f"entity: {entity.identifier} of "
f"blueprint: {entity.blueprint}"
)
result = response.json()

if (
response.status_code == 404
response.status_code == status.HTTP_404_NOT_FOUND
and not result.get("ok")
and result.get("error") == "not_found"
and result.get("error") == PortApiStatus.NOT_FOUND.value
):
return False
handle_status_code(response, should_raise)
Expand Down
4 changes: 2 additions & 2 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from uuid import uuid4

from loguru import logger
from port_ocean.utils.failed_entity_handler import FailedEntityHandler
from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter
from pydispatch import dispatcher # type: ignore
from werkzeug.local import LocalStack, LocalProxy

Expand Down Expand Up @@ -53,7 +53,7 @@ class EventContext:
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)

failed_entity_handler = FailedEntityHandler()
entity_topological_sorter = EntityTopologicalSorter()

def on_abort(self, func: AbortCallbackFunction) -> None:
self._on_abort_callbacks.append(func)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,7 @@ async def upsert(
if upsertedEntity:
modified_entities.append(upsertedEntity)
if upsertedEntity is False:
event.failed_entity_handler.register_failed_upsert_call_arguments(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
self.context.port_client.upsert_entity,
)
event.entity_topological_sorter.register_entity(entity)
return modified_entities

async def delete(
Expand Down
12 changes: 9 additions & 3 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from graphlib import CycleError
import inspect
import typing
from typing import Callable, Awaitable, Any
Expand Down Expand Up @@ -457,9 +458,14 @@ async def sync_raw_all(

creation_results.append(await task)
try:
await event.failed_entity_handler.handle_failed()
except:
await event.failed_entity_handler.handle_failed_no_sort()
for entity in event.entity_topological_sorter.get_entities():
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except OceanAbortException as ocean_abort:
if isinstance(ocean_abort.__cause__,CycleError):
for entity in event.entity_topological_sorter.entities:
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
Expand Down
4 changes: 4 additions & 0 deletions port_ocean/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ class Runtime(Enum):
OnPrem = "OnPrem"


class PortApiStatus(Enum):
NOT_FOUND = "not_found"


class Entity(BaseModel):
identifier: Any
blueprint: Any
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,15 @@ async def no_op_event_context(
yield existing_event


def create_entity(
id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool
) -> Entity:
entity = Entity(identifier=id, blueprint=blueprint)
entity.relations = relation
entity.properties = {"mock_is_to_fail": is_to_fail}
return entity


@pytest.mark.asyncio
async def test_sync_raw_mixin_self_dependency(
mock_sync_raw_mixin: SyncRawMixin,
Expand All @@ -199,16 +208,15 @@ async def test_sync_raw_mixin_self_dependency(
)
)
event.port_app_config = app_config
event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=event.failed_entity_handler.register_failed_upsert_call_arguments) # type: ignore
event.failed_entity_handler.handle_failed = MagicMock(side_effect=event.failed_entity_handler.handle_failed) # type: ignore
event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore
event.entity_topological_sorter.register_entity = MagicMock(side_effect=event.entity_topological_sorter.register_entity) # type: ignore
event.entity_topological_sorter.get_entities = MagicMock(side_effect=event.entity_topological_sorter.get_entities) # type: ignore

with patch(
"port_ocean.core.integrations.mixins.sync_raw.event_context",
lambda *args, **kwargs: no_op_event_context(event),
):
with patch(
"port_ocean.utils.failed_entity_handler.order_by_entities_dependencies",
"port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies",
mock_order_by_entities_dependencies,
):

Expand All @@ -217,22 +225,17 @@ async def test_sync_raw_mixin_self_dependency(
)

assert (
len(event.failed_entity_handler._failed_entity_callback_list) == 1
len(event.entity_topological_sorter.entities) == 1
), "Expected one failed entity callback due to retry logic"
assert (
event.failed_entity_handler.register_failed_upsert_call_arguments.call_count
== 1
)
assert event.failed_entity_handler.handle_failed.call_count == 1
assert event.entity_topological_sorter.register_entity.call_count == 1
assert event.entity_topological_sorter.get_entities.call_count == 1

assert mock_order_by_entities_dependencies.call_count == 1
assert [
call[0][0][0]
for call in mock_order_by_entities_dependencies.call_args_list
] == [entity for entity in entities if entity.identifier == "entity_1"]

assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0


@pytest.mark.asyncio
async def test_sync_raw_mixin_circular_dependency(
Expand Down Expand Up @@ -260,35 +263,32 @@ async def test_sync_raw_mixin_circular_dependency(
)
)
event.port_app_config = app_config
org = event.failed_entity_handler.register_failed_upsert_call_arguments
org = event.entity_topological_sorter.register_entity

def mock_register_failed_upsert_call_arguments(
*args: Any, **kwargs: Any
) -> Any:
def mock_register_entity(*args: Any, **kwargs: Any) -> Any:
entity = args[0]
entity.properties["mock_is_to_fail"] = False
return org(*args, **kwargs)

event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore
event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore
raiesed_error_handle_failed = []
org_event_handle_failed = event.failed_entity_handler.handle_failed
org_get_entities = event.entity_topological_sorter.get_entities

async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return await org_event_handle_failed(*args, **kwargs)
return list(org_get_entities(*args, **kwargs))
except Exception as e:
raiesed_error_handle_failed.append(e)
raise e

event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore
event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore
event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore

with patch(
"port_ocean.core.integrations.mixins.sync_raw.event_context",
lambda *args, **kwargs: no_op_event_context(event),
):
with patch(
"port_ocean.utils.failed_entity_handler.order_by_entities_dependencies",
"port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies",
mock_order_by_entities_dependencies,
):

Expand All @@ -297,33 +297,20 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
)

assert (
len(event.failed_entity_handler._failed_entity_callback_list) == 2
len(event.entity_topological_sorter.entities) == 2
), "Expected one failed entity callback due to retry logic"
assert (
event.failed_entity_handler.register_failed_upsert_call_arguments.call_count
== 2
)
assert event.failed_entity_handler.handle_failed.call_count == 1
assert event.entity_topological_sorter.register_entity.call_count == 2
assert event.entity_topological_sorter.get_entities.call_count == 1
assert len(raiesed_error_handle_failed) == 1
assert isinstance(raiesed_error_handle_failed[0], OceanAbortException)
assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError)
assert event.failed_entity_handler.handle_failed_no_sort.call_count == 1
assert (
len(mock_ocean.port_client.client.post.call_args_list) # type: ignore
/ len(entities)
== 2
)


def create_entity(
id: str, blueprint: str, relation: dict[str, str], is_to_fail: bool
) -> Entity:
entity = Entity(identifier=id, blueprint=blueprint)
entity.relations = relation
entity.properties = {"mock_is_to_fail": is_to_fail}
return entity


@pytest.mark.asyncio
async def test_sync_raw_mixin_dependency(
mock_sync_raw_mixin: SyncRawMixin, mock_ocean: Ocean
Expand Down Expand Up @@ -353,35 +340,32 @@ async def test_sync_raw_mixin_dependency(
)
)
event.port_app_config = app_config
org = event.failed_entity_handler.register_failed_upsert_call_arguments
org = event.entity_topological_sorter.register_entity

def mock_register_failed_upsert_call_arguments(
*args: Any, **kwargs: Any
) -> None:
def mock_register_entity(*args: Any, **kwargs: Any) -> None:
entity = args[0]
entity.properties["mock_is_to_fail"] = False
return org(*args, **kwargs)

event.failed_entity_handler.register_failed_upsert_call_arguments = MagicMock(side_effect=mock_register_failed_upsert_call_arguments) # type: ignore
event.entity_topological_sorter.register_entity = MagicMock(side_effect=mock_register_entity) # type: ignore
raiesed_error_handle_failed = []
org_event_handle_failed = event.failed_entity_handler.handle_failed
org_event_get_entities = event.entity_topological_sorter.get_entities

async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any:
try:
return await org_event_handle_failed(*args, **kwargs)
return org_event_get_entities(*args, **kwargs)
except Exception as e:
raiesed_error_handle_failed.append(e)
raise e

event.failed_entity_handler.handle_failed = MagicMock(side_effect=lambda *args, **kwargs: handle_failed_wrapper(*args, **kwargs)) # type: ignore
event.failed_entity_handler.handle_failed_no_sort = MagicMock(side_effect=event.failed_entity_handler.handle_failed_no_sort) # type: ignore
event.entity_topological_sorter.get_entities = MagicMock(side_effect=lambda *args, **kwargs: get_entities_wrapper(*args, **kwargs)) # type: ignore

with patch(
"port_ocean.core.integrations.mixins.sync_raw.event_context",
lambda *args, **kwargs: no_op_event_context(event),
):
with patch(
"port_ocean.utils.failed_entity_handler.order_by_entities_dependencies",
"port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies",
mock_order_by_entities_dependencies,
):

Expand All @@ -390,13 +374,10 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
)

assert (
len(event.failed_entity_handler._failed_entity_callback_list) == 5
len(event.entity_topological_sorter.entities) == 5
), "Expected one failed entity callback due to retry logic"
assert (
event.failed_entity_handler.register_failed_upsert_call_arguments.call_count
== 5
)
assert event.failed_entity_handler.handle_failed.call_count == 1
assert event.entity_topological_sorter.register_entity.call_count == 5
assert event.entity_topological_sorter.get_entities.call_count == 1
assert len(raiesed_error_handle_failed) == 0
assert mock_ocean.port_client.client.post.call_count == 10 # type: ignore
assert mock_order_by_entities_dependencies.call_count == 1
Expand All @@ -406,14 +387,7 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:

assert "-".join(
[call[1].get("json").get("identifier") for call in first]
) == "-".join(
reversed(
[
entity.identifier
for entity in calc_result_mock.entity_selector_diff.passed
]
)
)
) == "-".join(reversed([entity.identifier for entity in entities]))
assert "-".join(
[call[1].get("json").get("identifier") for call in second]
) in (
Expand All @@ -422,4 +396,3 @@ async def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
"entity_3-entity_1-entity_4-entity_2-entity_5",
"entity_3-entity_1-entity_4-entity_5-entity_2",
)
assert event.failed_entity_handler.handle_failed_no_sort.call_count == 0
Loading

0 comments on commit 209e64b

Please sign in to comment.