Skip to content

Commit

Permalink
[Core] Add retry for failed upserts and handle circular dependencies
Browse files Browse the repository at this point in the history
1. Change location of files.
2. Exctract logic of handle failed into a function.
3. Update get_entities.
  • Loading branch information
Ivan Kalinovski authored and Ivan Kalinovski committed Dec 22, 2024
1 parent 209e64b commit 3d49fa7
Show file tree
Hide file tree
Showing 15 changed files with 154 additions and 74 deletions.
9 changes: 6 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,16 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## 0.15.3 (2024-12-15)


### Improvements

- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all
the failed entities through a topological sort and upsert them as well
- Test upsert with dependencies, with self circular dependency and external entity dependency.

### Bug Fixes

- When experiencing cyclic error on topological sort try unsorted upsert of the entities
- Test upsert with dependencies, with self circular dependency and external entity dependency.
- Fix topologicals sort tree creation so an entity cannot be its own dependency
- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all
the failed entities through a topological sort and upsert them as well


## 0.14.7 (2024-12-09)
Expand Down
10 changes: 8 additions & 2 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
handle_status_code,
PORT_HTTP_MAX_CONNECTIONS_LIMIT,
)
from port_ocean.core.models import Entity, PortApiStatus
from port_ocean.core.models import Entity, PortAPIErrorMessage
from starlette import status


Expand All @@ -31,6 +31,11 @@ async def upsert_entity(
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> Entity | None | Literal[False]:
"""
[Entity] will be returned on happy flow
[None] will be returned if entity is using search identifier
[False] will be returned if upsert failed because of unmet dependency
"""
validation_only = request_options["validation_only"]
async with self.semaphore:
logger.debug(
Expand Down Expand Up @@ -62,8 +67,9 @@ async def upsert_entity(
if (
response.status_code == status.HTTP_404_NOT_FOUND
and not result.get("ok")
and result.get("error") == PortApiStatus.NOT_FOUND.value
and result.get("error") == PortAPIErrorMessage.NOT_FOUND.value
):
# Return false to differentiate from `result_entity.is_using_search_identifier`
return False
handle_status_code(response, should_raise)
result = response.json()
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from uuid import uuid4

from loguru import logger
from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from pydispatch import dispatcher # type: ignore
from werkzeug.local import LocalStack, LocalProxy

Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/defaults/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from port_ocean.core.handlers.port_app_config.models import PortAppConfig
from port_ocean.core.models import Blueprint
from port_ocean.core.utils import gather_and_split_errors_from_results
from port_ocean.core.utils.utils import gather_and_split_errors_from_results
from port_ocean.exceptions.port_defaults import (
AbortDefaultCreationError,
)
Expand Down
12 changes: 7 additions & 5 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from port_ocean.core.handlers.entities_state_applier.port.get_related_entities import (
get_related_entities,
)
from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import (
order_by_entities_dependencies,
)

from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import EntityDiff
from port_ocean.core.utils import is_same_entity, get_port_diff
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from port_ocean.core.utils.utils import is_same_entity, get_port_diff


class HttpEntitiesStateApplier(BaseEntitiesStateApplier):
Expand Down Expand Up @@ -126,6 +125,7 @@ async def upsert(
)
if upsertedEntity:
modified_entities.append(upsertedEntity)
# condition to false to differentiate from `result_entity.is_using_search_identifier`
if upsertedEntity is False:
event.entity_topological_sorter.register_entity(entity)
return modified_entities
Expand All @@ -142,7 +142,9 @@ async def delete(
should_raise=False,
)
else:
ordered_deleted_entities = order_by_entities_dependencies(entities)
ordered_deleted_entities = (
EntityTopologicalSorter.order_by_entities_dependencies(entities)
)

for entity in ordered_deleted_entities:
await self.context.port_client.delete_entity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
EntitySelectorDiff,
CalculationResult,
)
from port_ocean.core.utils import gather_and_split_errors_from_results, zip_and_sum
from port_ocean.core.utils.utils import (
gather_and_split_errors_from_results,
zip_and_sum,
)
from port_ocean.exceptions.core import EntityProcessorException
from port_ocean.utils.queue_utils import process_in_queue

Expand Down
28 changes: 18 additions & 10 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
RAW_ITEM,
CalculationResult,
)
from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.exceptions.core import OceanAbortException
import json

SEND_RAW_DATA_EXAMPLES_AMOUNT = 5


Expand Down Expand Up @@ -397,7 +397,22 @@ async def update_raw_diff(
{"before": entities_before_flatten, "after": entities_after_flatten},
user_agent_type,
)
async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None:
try:
if event.entity_topological_sorter.is_to_execute():
logger.info("Executings topological sort of entities failed to upsert")
else:
logger.info("No failed entities on upsert")
return

for entity in event.entity_topological_sorter.get_entities():
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except OceanAbortException as ocean_abort:
logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered")
if isinstance(ocean_abort.__cause__,CycleError):
for entity in event.entity_topological_sorter.get_entities(False):
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)
async def sync_raw_all(
self,
_: dict[Any, Any] | None = None,
Expand Down Expand Up @@ -457,15 +472,8 @@ async def sync_raw_all(
event.on_abort(lambda: task.cancel())

creation_results.append(await task)
try:
for entity in event.entity_topological_sorter.get_entities():
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except OceanAbortException as ocean_abort:
if isinstance(ocean_abort.__cause__,CycleError):
for entity in event.entity_topological_sorter.entities:
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

await self.sort_and_upsert_failed_entities(user_agent_type)
except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/integrations/mixins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
RESYNC_EVENT_LISTENER,
RESYNC_RESULT,
)
from port_ocean.core.utils import validate_result
from port_ocean.core.utils.utils import validate_result
from port_ocean.exceptions.core import (
RawObjectValidationException,
OceanAbortException,
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class Runtime(Enum):
OnPrem = "OnPrem"


class PortApiStatus(Enum):
class PortAPIErrorMessage(Enum):
NOT_FOUND = "not_found"


Expand Down
93 changes: 93 additions & 0 deletions port_ocean/core/utils/entity_topological_sorter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Any, Generator
from port_ocean.core.models import Entity

from dataclasses import dataclass, field
from loguru import logger

from graphlib import TopologicalSorter, CycleError
from typing import Set

from port_ocean.exceptions.core import OceanAbortException

Node = tuple[str, str]


@dataclass
class EntityTopologicalSorter:
entities: list[Entity] = field(default_factory=list)

def register_entity(
self,
entity: Entity,
) -> None:
logger.debug(
f"Will retry upserting entity - {entity.identifier} at the end of resync"
)
self.entities.append(entity)

def is_to_execute(self):
return len(self.entities)

def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]:
if not sorted:
for entity in self.entities:
yield entity
return

entity_map: dict[str, Entity] = {
f"{entity.identifier}-{entity.blueprint}": entity
for entity in self.entities
}
sorted_and_mapped = EntityTopologicalSorter.order_by_entities_dependencies(
self.entities
)
for obj in sorted_and_mapped:
entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}")
if entity is not None:
yield entity

@staticmethod
def node(entity: Entity) -> Node:
return entity.identifier, entity.blueprint

@staticmethod
def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
nodes: dict[Node, Set[Node]] = {}
entities_map = {}
for entity in entities:
nodes[EntityTopologicalSorter.node(entity)] = set()
entities_map[EntityTopologicalSorter.node(entity)] = entity

for entity in entities:
relation_target_ids: list[str] = sum(
[
identifiers if isinstance(identifiers, list) else [identifiers]
for identifiers in entity.relations.values()
if identifiers is not None
],
[],
)
related_entities = [
related
for related in entities
if related.identifier in relation_target_ids
]

for related_entity in related_entities:
if (
entity.blueprint is not related_entity.blueprint
or entity.identifier is not related_entity.identifier
):
nodes[EntityTopologicalSorter.node(entity)].add(
EntityTopologicalSorter.node(related_entity)
)

sort_op = TopologicalSorter(nodes)
try:
return [entities_map[item] for item in sort_op.static_order()]
except CycleError as ex:
raise OceanAbortException(
"Cannot order entities due to cyclic dependencies. \n"
"If you do want to have cyclic dependencies, please make sure to set the keys"
" 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port."
) from ex
File renamed without changes.
2 changes: 1 addition & 1 deletion port_ocean/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from port_ocean.config.dynamic import default_config_factory
from port_ocean.config.settings import ApplicationSettings, LogLevelType
from port_ocean.core.defaults.initialize import initialize_defaults
from port_ocean.core.utils import validate_integration_runtime
from port_ocean.core.utils.utils import validate_integration_runtime
from port_ocean.log.logger_setup import setup_logger
from port_ocean.ocean import Ocean
from port_ocean.utils.misc import get_spec_file, load_module
Expand Down
22 changes: 12 additions & 10 deletions port_ocean/tests/core/handlers/mixins/test_sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from httpx import Response
from port_ocean.clients.port.client import PortClient
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from port_ocean.exceptions.core import OceanAbortException
import pytest
from unittest.mock import MagicMock, AsyncMock, patch
Expand All @@ -28,9 +29,6 @@
from port_ocean.context.event import EventContext, event_context, EventType
from port_ocean.clients.port.types import UserAgentType
from port_ocean.context.ocean import ocean
from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import (
order_by_entities_dependencies,
)


@pytest.fixture
Expand Down Expand Up @@ -199,7 +197,7 @@ async def test_sync_raw_mixin_self_dependency(
mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore

mock_order_by_entities_dependencies = MagicMock(
side_effect=order_by_entities_dependencies
side_effect=EntityTopologicalSorter.order_by_entities_dependencies
)
async with event_context(EventType.RESYNC, trigger_type="machine") as event:
app_config = (
Expand All @@ -216,7 +214,7 @@ async def test_sync_raw_mixin_self_dependency(
lambda *args, **kwargs: no_op_event_context(event),
):
with patch(
"port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies",
"port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies",
mock_order_by_entities_dependencies,
):

Expand Down Expand Up @@ -254,7 +252,7 @@ async def test_sync_raw_mixin_circular_dependency(
mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore

mock_order_by_entities_dependencies = MagicMock(
side_effect=order_by_entities_dependencies
side_effect=EntityTopologicalSorter.order_by_entities_dependencies
)
async with event_context(EventType.RESYNC, trigger_type="machine") as event:
app_config = (
Expand Down Expand Up @@ -288,7 +286,7 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
lambda *args, **kwargs: no_op_event_context(event),
):
with patch(
"port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies",
"port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies",
mock_order_by_entities_dependencies,
):

Expand All @@ -300,7 +298,11 @@ def handle_failed_wrapper(*args: Any, **kwargs: Any) -> Any:
len(event.entity_topological_sorter.entities) == 2
), "Expected one failed entity callback due to retry logic"
assert event.entity_topological_sorter.register_entity.call_count == 2
assert event.entity_topological_sorter.get_entities.call_count == 1
assert event.entity_topological_sorter.get_entities.call_count == 2
assert [
call[0]
for call in event.entity_topological_sorter.get_entities.call_args_list
] == [(), (False,)]
assert len(raiesed_error_handle_failed) == 1
assert isinstance(raiesed_error_handle_failed[0], OceanAbortException)
assert isinstance(raiesed_error_handle_failed[0].__cause__, CycleError)
Expand Down Expand Up @@ -331,7 +333,7 @@ async def test_sync_raw_mixin_dependency(
mock_sync_raw_mixin.entity_processor.parse_items = AsyncMock(return_value=calc_result_mock) # type: ignore

mock_order_by_entities_dependencies = MagicMock(
side_effect=order_by_entities_dependencies
side_effect=EntityTopologicalSorter.order_by_entities_dependencies
)
async with event_context(EventType.RESYNC, trigger_type="machine") as event:
app_config = (
Expand Down Expand Up @@ -365,7 +367,7 @@ def get_entities_wrapper(*args: Any, **kwargs: Any) -> Any:
lambda *args, **kwargs: no_op_event_context(event),
):
with patch(
"port_ocean.utils.entity_topological_sorter.order_by_entities_dependencies",
"port_ocean.core.utils.entity_topological_sorter.EntityTopologicalSorter.order_by_entities_dependencies",
mock_order_by_entities_dependencies,
):

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from port_ocean.core.models import Entity
from port_ocean.utils.entity_topological_sorter import EntityTopologicalSorter
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from unittest.mock import MagicMock
from port_ocean.exceptions.core import (
OceanAbortException,
Expand Down
Loading

0 comments on commit 3d49fa7

Please sign in to comment.