Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Core] Add retry for failed upserts and handle circular dependencies #1241

Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,21 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Added `SaasOauth` runtime support


## 0.15.3 (2024-12-15)


### Improvements

- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all
the failed entities through a topological sort and upsert them as well
- Test upsert with dependencies, with self circular dependency and external entity dependency.

### Bug Fixes
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved

- When experiencing cyclic error on topological sort try unsorted upsert of the entities
- Fix topologicals sort tree creation so an entity cannot be its own dependency


## 0.14.7 (2024-12-09)


Expand Down
22 changes: 18 additions & 4 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any
from typing import Any, Literal
from urllib.parse import quote_plus

import httpx
Expand All @@ -11,7 +11,8 @@
handle_status_code,
PORT_HTTP_MAX_CONNECTIONS_LIMIT,
)
from port_ocean.core.models import Entity
from port_ocean.core.models import Entity, PortAPIErrorMessage
from starlette import status


class EntityClientMixin:
Expand All @@ -29,7 +30,12 @@ async def upsert_entity(
request_options: RequestOptions,
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> Entity | None:
) -> Entity | None | Literal[False]:
"""
[Entity] will be returned on happy flow
[None] will be returned if entity is using search identifier
[False] will be returned if upsert failed because of unmet dependency
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets follow the function description format like in process_in_queue

validation_only = request_options["validation_only"]
async with self.semaphore:
logger.debug(
Expand All @@ -50,13 +56,21 @@ async def upsert_entity(
},
extensions={"retryable": True},
)

if response.is_error:
logger.error(
f"Error {'Validating' if validation_only else 'Upserting'} "
f"entity: {entity.identifier} of "
f"blueprint: {entity.blueprint}"
)
result = response.json()

if (
response.status_code == status.HTTP_404_NOT_FOUND
and not result.get("ok")
and result.get("error") == PortAPIErrorMessage.NOT_FOUND.value
):
# Return false to differentiate from `result_entity.is_using_search_identifier`
return False
handle_status_code(response, should_raise)
result = response.json()

Expand Down
4 changes: 4 additions & 0 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from uuid import uuid4

from loguru import logger
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from pydispatch import dispatcher # type: ignore
from werkzeug.local import LocalStack, LocalProxy

Expand All @@ -24,6 +25,7 @@
)
from port_ocean.utils.misc import get_time


if TYPE_CHECKING:
from port_ocean.core.handlers.port_app_config.models import (
ResourceConfig,
Expand Down Expand Up @@ -51,6 +53,8 @@ class EventContext:
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)

entity_topological_sorter = EntityTopologicalSorter()

def on_abort(self, func: AbortCallbackFunction) -> None:
self._on_abort_callbacks.append(func)

Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/defaults/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from port_ocean.core.handlers.port_app_config.models import PortAppConfig
from port_ocean.core.models import Blueprint
from port_ocean.core.utils import gather_and_split_errors_from_results
from port_ocean.core.utils.utils import gather_and_split_errors_from_results
from port_ocean.exceptions.port_defaults import (
AbortDefaultCreationError,
)
Expand Down
17 changes: 10 additions & 7 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from port_ocean.core.handlers.entities_state_applier.port.get_related_entities import (
get_related_entities,
)
from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import (
order_by_entities_dependencies,
)

from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import EntityDiff
from port_ocean.core.utils import is_same_entity, get_port_diff
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from port_ocean.core.utils.utils import is_same_entity, get_port_diff


class HttpEntitiesStateApplier(BaseEntitiesStateApplier):
Expand Down Expand Up @@ -115,8 +114,7 @@ async def upsert(
entities_without_search_identifier.append(entity)

ordered_created_entities = reversed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is no ordering needed any more, lets just use entities

entities_with_search_identifier
+ order_by_entities_dependencies(entities_without_search_identifier)
entities_with_search_identifier + entities_without_search_identifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens with search relation and identifier? do we want to register them as well?

)
for entity in ordered_created_entities:
upsertedEntity = await self.context.port_client.upsert_entity(
Expand All @@ -127,6 +125,9 @@ async def upsert(
)
if upsertedEntity:
modified_entities.append(upsertedEntity)
# condition to false to differentiate from `result_entity.is_using_search_identifier`
if upsertedEntity is False:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets add a comment why is False and not if not upsertedEntity

event.entity_topological_sorter.register_entity(entity)
return modified_entities

async def delete(
Expand All @@ -141,7 +142,9 @@ async def delete(
should_raise=False,
)
else:
ordered_deleted_entities = order_by_entities_dependencies(entities)
ordered_deleted_entities = (
EntityTopologicalSorter.order_by_entities_dependencies(entities)
)

for entity in ordered_deleted_entities:
await self.context.port_client.delete_entity(
Expand Down
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can squeeze the entity_topological_sorter together with this?

Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def node(entity: Entity) -> Node:
def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
nodes: dict[Node, Set[Node]] = {}
entities_map = {}

for entity in entities:
nodes[node(entity)] = set()
entities_map[node(entity)] = entity
Expand All @@ -33,7 +32,11 @@ def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
]

for related_entity in related_entities:
nodes[node(entity)].add(node(related_entity))
if (
entity.blueprint is not related_entity.blueprint
or entity.identifier is not related_entity.identifier
):
nodes[node(entity)].add(node(related_entity))

sort_op = TopologicalSorter(nodes)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
EntitySelectorDiff,
CalculationResult,
)
from port_ocean.core.utils import gather_and_split_errors_from_results, zip_and_sum
from port_ocean.core.utils.utils import (
gather_and_split_errors_from_results,
zip_and_sum,
)
from port_ocean.exceptions.core import EntityProcessorException
from port_ocean.utils.queue_utils import process_in_queue

Expand Down
21 changes: 20 additions & 1 deletion port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from graphlib import CycleError
import inspect
import typing
from typing import Callable, Awaitable, Any
Expand Down Expand Up @@ -27,7 +28,7 @@
RAW_ITEM,
CalculationResult,
)
from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.exceptions.core import OceanAbortException

SEND_RAW_DATA_EXAMPLES_AMOUNT = 5
Expand Down Expand Up @@ -396,7 +397,22 @@ async def update_raw_diff(
{"before": entities_before_flatten, "after": entities_after_flatten},
user_agent_type,
)
async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None:
try:
if event.entity_topological_sorter.is_to_execute():
logger.info("Executings topological sort of entities failed to upsert")
else:
logger.info("No failed entities on upsert")
return

for entity in event.entity_topological_sorter.get_entities():
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except OceanAbortException as ocean_abort:
logger.info("Failed topological sort of failed to upsert entites - trying to upsert unordered")
if isinstance(ocean_abort.__cause__,CycleError):
for entity in event.entity_topological_sorter.get_entities(False):
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)
async def sync_raw_all(
self,
_: dict[Any, Any] | None = None,
Expand Down Expand Up @@ -426,6 +442,7 @@ async def sync_raw_all(
use_cache=False
)
logger.info(f"Resync will use the following mappings: {app_config.dict()}")

try:
did_fetched_current_state = True
entities_at_port = await ocean.port_client.search_entities(
Expand Down Expand Up @@ -455,6 +472,8 @@ async def sync_raw_all(
event.on_abort(lambda: task.cancel())

creation_results.append(await task)

await self.sort_and_upsert_failed_entities(user_agent_type)
except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/integrations/mixins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
RESYNC_EVENT_LISTENER,
RESYNC_RESULT,
)
from port_ocean.core.utils import validate_result
from port_ocean.core.utils.utils import validate_result
from port_ocean.exceptions.core import (
RawObjectValidationException,
OceanAbortException,
Expand Down
4 changes: 4 additions & 0 deletions port_ocean/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def is_installation_type_compatible(self, installation_type: str) -> bool:
) or installation_type == self.value


class PortAPIErrorMessage(Enum):
NOT_FOUND = "not_found"


class Entity(BaseModel):
identifier: Any
blueprint: Any
Expand Down
93 changes: 93 additions & 0 deletions port_ocean/core/utils/entity_topological_sorter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
from typing import Any, Generator
from port_ocean.core.models import Entity

from dataclasses import dataclass, field
from loguru import logger

from graphlib import TopologicalSorter, CycleError
from typing import Set

from port_ocean.exceptions.core import OceanAbortException

Node = tuple[str, str]


@dataclass
class EntityTopologicalSorter:
entities: list[Entity] = field(default_factory=list)

def register_entity(
self,
entity: Entity,
) -> None:
logger.debug(
f"Will retry upserting entity - {entity.identifier} at the end of resync"
)
self.entities.append(entity)

def is_to_execute(self):
return len(self.entities)

def get_entities(self, sorted: bool = True) -> Generator[Entity, Any, None]:
if not sorted:
for entity in self.entities:
yield entity
return

entity_map: dict[str, Entity] = {
f"{entity.identifier}-{entity.blueprint}": entity
for entity in self.entities
}
sorted_and_mapped = EntityTopologicalSorter.order_by_entities_dependencies(
self.entities
)
for obj in sorted_and_mapped:
entity = entity_map.get(f"{obj.identifier}-{obj.blueprint}")
if entity is not None:
yield entity

@staticmethod
def node(entity: Entity) -> Node:
return entity.identifier, entity.blueprint

@staticmethod
def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
nodes: dict[Node, Set[Node]] = {}
entities_map = {}
for entity in entities:
nodes[EntityTopologicalSorter.node(entity)] = set()
entities_map[EntityTopologicalSorter.node(entity)] = entity

for entity in entities:
relation_target_ids: list[str] = sum(
[
identifiers if isinstance(identifiers, list) else [identifiers]
for identifiers in entity.relations.values()
if identifiers is not None
],
[],
)
related_entities = [
related
for related in entities
if related.identifier in relation_target_ids
]

for related_entity in related_entities:
if (
entity.blueprint is not related_entity.blueprint
or entity.identifier is not related_entity.identifier
):
nodes[EntityTopologicalSorter.node(entity)].add(
EntityTopologicalSorter.node(related_entity)
)

sort_op = TopologicalSorter(nodes)
try:
return [entities_map[item] for item in sort_op.static_order()]
except CycleError as ex:
raise OceanAbortException(
"Cannot order entities due to cyclic dependencies. \n"
"If you do want to have cyclic dependencies, please make sure to set the keys"
" 'createMissingRelatedEntities' and 'deleteDependentEntities' in the integration config in Port."
) from ex
File renamed without changes.
2 changes: 1 addition & 1 deletion port_ocean/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from port_ocean.config.dynamic import default_config_factory
from port_ocean.config.settings import ApplicationSettings, LogLevelType
from port_ocean.core.defaults.initialize import initialize_defaults
from port_ocean.core.utils import validate_integration_runtime
from port_ocean.core.utils.utils import validate_integration_runtime
from port_ocean.log.logger_setup import setup_logger
from port_ocean.ocean import Ocean
from port_ocean.utils.misc import get_spec_file, load_module
Expand Down
Loading
Loading