Skip to content

Commit

Permalink
[Core] Add retry for failed upserts and handle circular dependencies (#…
Browse files Browse the repository at this point in the history
…1241)

# Description

What -
1. Add `EntityTopologicalSorter` class to register entities and perform topological sort.
2. Register entities that failed upsert because of unmet dependency.
3. Sort entities topologically and try upserting them.
3. If topological sort failed for cyclical dependency - try unsorted upsert.
4. Update topological's sort tree creation so an entity cannot be it's
own dependency.
5. Add tests for -  upsert with dependencies, with self circular dependency and
circular dependency.

Why -
Users having circular dependency errors and entities are not upserted.

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>

### Core testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync finishes successfully
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Scheduled resync able to abort existing resync and start a new one
- [ ] Tested with at least 2 integrations from scratch
- [ ] Tested with Kafka and Polling event listeners
- [ ] Tested deletion of entities that don't pass the selector


### Integration testing checklist

- [ ] Integration able to create all default resources from scratch
- [ ] Resync able to create entities
- [ ] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [ ] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)

### Preflight checklist

- [ ] Handled rate limiting
- [ ] Handled pagination
- [ ] Implemented the code in async
- [ ] Support Multi account

## Screenshots

Include screenshots from your environment showing how the resources of
the integration will look.

## API Documentation

Provide links to the API documentation used for this integration.

---------

Co-authored-by: Ivan Kalinovski <[email protected]>
Co-authored-by: Ivan Kalinovski <[email protected]>
  • Loading branch information
3 people authored Dec 24, 2024
1 parent 1312162 commit b5690b8
Show file tree
Hide file tree
Showing 18 changed files with 705 additions and 42 deletions.
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.16.0 (2024-12-24)


### Improvements

- When `createMissingRelatedEntities` is set to `false` and upserting entity failed on not existing entity, the entity will be gathered to the end of the resync and will try sorting all
the failed entities through a topological sort and upsert them as well
- Test upsert with dependencies, with self circular dependency and external entity dependency.

### Bug Fixes

- When experiencing cyclic error on topological sort try unsorted upsert of the entities
- Fix topologicals sort tree creation so an entity cannot be its own dependency


## 0.15.3 (2024-12-22)

### Bug Fixes
Expand Down
22 changes: 11 additions & 11 deletions integrations/sonarqube/examples/project.entity.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@
"title": "Port API",
"team": [],
"properties": {
"organization": "port-labs",
"link": "https://sonarcloud.io/project/overview?id=port-labs_Port_port-api",
"qualityGateStatus": "ERROR",
"numberOfBugs": 20,
"numberOfCodeSmells": 262,
"numberOfVulnerabilities": 0,
"numberOfHotSpots": 3,
"numberOfDuplications": 18,
"coverage": 0,
"mainBranch": "main"
"organization": "port-labs",
"link": "https://sonarcloud.io/project/overview?id=port-labs_Port_port-api",
"qualityGateStatus": "ERROR",
"numberOfBugs": 20,
"numberOfCodeSmells": 262,
"numberOfVulnerabilities": 0,
"numberOfHotSpots": 3,
"numberOfDuplications": 18,
"coverage": 0,
"mainBranch": "main"
},
"relations": {},
"icon": "sonarqube"
}
}
37 changes: 33 additions & 4 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import asyncio
from typing import Any
from typing import Any, Literal
from urllib.parse import quote_plus

import httpx
Expand All @@ -11,7 +11,8 @@
handle_status_code,
PORT_HTTP_MAX_CONNECTIONS_LIMIT,
)
from port_ocean.core.models import Entity
from port_ocean.core.models import Entity, PortAPIErrorMessage
from starlette import status


class EntityClientMixin:
Expand All @@ -29,7 +30,27 @@ async def upsert_entity(
request_options: RequestOptions,
user_agent_type: UserAgentType | None = None,
should_raise: bool = True,
) -> Entity | None:
) -> Entity | None | Literal[False]:
"""
This function upserts an entity into Port.
Usage:
```python
upsertedEntity = await self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
user_agent_type,
should_raise=False,
)
```
:param entity: An Entity to be upserted
:param request_options: A dictionary specifying how to upsert the entity
:param user_agent_type: a UserAgentType specifying who is preforming the action
:param should_raise: A boolean specifying whether the error should be raised or handled silently
:return: [Entity] if the upsert occured successfully
:return: [None] will be returned if entity is using search identifier
:return: [False] will be returned if upsert failed because of unmet dependency
"""
validation_only = request_options["validation_only"]
async with self.semaphore:
logger.debug(
Expand All @@ -50,13 +71,21 @@ async def upsert_entity(
},
extensions={"retryable": True},
)

if response.is_error:
logger.error(
f"Error {'Validating' if validation_only else 'Upserting'} "
f"entity: {entity.identifier} of "
f"blueprint: {entity.blueprint}"
)
result = response.json()

if (
response.status_code == status.HTTP_404_NOT_FOUND
and not result.get("ok")
and result.get("error") == PortAPIErrorMessage.NOT_FOUND.value
):
# Return false to differentiate from `result_entity.is_using_search_identifier`
return False
handle_status_code(response, should_raise)
result = response.json()

Expand Down
11 changes: 11 additions & 0 deletions port_ocean/context/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from uuid import uuid4

from loguru import logger
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from pydispatch import dispatcher # type: ignore
from werkzeug.local import LocalStack, LocalProxy

Expand All @@ -24,6 +25,7 @@
)
from port_ocean.utils.misc import get_time


if TYPE_CHECKING:
from port_ocean.core.handlers.port_app_config.models import (
ResourceConfig,
Expand All @@ -50,6 +52,9 @@ class EventContext:
_parent_event: Optional["EventContext"] = None
_event_id: str = field(default_factory=lambda: str(uuid4()))
_on_abort_callbacks: list[AbortCallbackFunction] = field(default_factory=list)
entity_topological_sorter: EntityTopologicalSorter = field(
default_factory=EntityTopologicalSorter
)

def on_abort(self, func: AbortCallbackFunction) -> None:
self._on_abort_callbacks.append(func)
Expand Down Expand Up @@ -129,6 +134,11 @@ async def event_context(
) -> AsyncIterator[EventContext]:
parent = parent_override or _event_context_stack.top
parent_attributes = parent.attributes if parent else {}
entity_topological_sorter = (
parent.entity_topological_sorter
if parent and parent.entity_topological_sorter
else EntityTopologicalSorter()
)

attributes = {**parent_attributes, **(attributes or {})}
new_event = EventContext(
Expand All @@ -138,6 +148,7 @@ async def event_context(
_parent_event=parent,
# inherit port app config from parent event, so it can be used in nested events
_port_app_config=parent.port_app_config if parent else None,
entity_topological_sorter=entity_topological_sorter,
)
_event_context_stack.push(new_event)

Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/defaults/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
)
from port_ocean.core.handlers.port_app_config.models import PortAppConfig
from port_ocean.core.models import Blueprint
from port_ocean.core.utils import gather_and_split_errors_from_results
from port_ocean.core.utils.utils import gather_and_split_errors_from_results
from port_ocean.exceptions.port_defaults import (
AbortDefaultCreationError,
)
Expand Down
26 changes: 9 additions & 17 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
from port_ocean.core.handlers.entities_state_applier.port.get_related_entities import (
get_related_entities,
)
from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import (
order_by_entities_dependencies,
)

from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import EntityDiff
from port_ocean.core.utils import is_same_entity, get_port_diff
from port_ocean.core.utils.entity_topological_sorter import EntityTopologicalSorter
from port_ocean.core.utils.utils import is_same_entity, get_port_diff


class HttpEntitiesStateApplier(BaseEntitiesStateApplier):
Expand Down Expand Up @@ -106,19 +105,7 @@ async def upsert(
should_raise=False,
)
else:
entities_with_search_identifier: list[Entity] = []
entities_without_search_identifier: list[Entity] = []
for entity in entities:
if entity.is_using_search_identifier:
entities_with_search_identifier.append(entity)
else:
entities_without_search_identifier.append(entity)

ordered_created_entities = reversed(
entities_with_search_identifier
+ order_by_entities_dependencies(entities_without_search_identifier)
)
for entity in ordered_created_entities:
upsertedEntity = await self.context.port_client.upsert_entity(
entity,
event.port_app_config.get_port_request_options(),
Expand All @@ -127,6 +114,9 @@ async def upsert(
)
if upsertedEntity:
modified_entities.append(upsertedEntity)
# condition to false to differentiate from `result_entity.is_using_search_identifier`
if upsertedEntity is False:
event.entity_topological_sorter.register_entity(entity)
return modified_entities

async def delete(
Expand All @@ -141,7 +131,9 @@ async def delete(
should_raise=False,
)
else:
ordered_deleted_entities = order_by_entities_dependencies(entities)
ordered_deleted_entities = (
EntityTopologicalSorter.order_by_entities_dependencies(entities)
)

for entity in ordered_deleted_entities:
await self.context.port_client.delete_entity(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ def node(entity: Entity) -> Node:
def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
nodes: dict[Node, Set[Node]] = {}
entities_map = {}

for entity in entities:
nodes[node(entity)] = set()
entities_map[node(entity)] = entity
Expand All @@ -33,7 +32,11 @@ def order_by_entities_dependencies(entities: list[Entity]) -> list[Entity]:
]

for related_entity in related_entities:
nodes[node(entity)].add(node(related_entity))
if (
entity.blueprint is not related_entity.blueprint
or entity.identifier is not related_entity.identifier
):
nodes[node(entity)].add(node(related_entity))

sort_op = TopologicalSorter(nodes)
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
EntitySelectorDiff,
CalculationResult,
)
from port_ocean.core.utils import gather_and_split_errors_from_results, zip_and_sum
from port_ocean.core.utils.utils import (
gather_and_split_errors_from_results,
zip_and_sum,
)
from port_ocean.exceptions.core import EntityProcessorException
from port_ocean.utils.queue_utils import process_in_queue

Expand Down
21 changes: 19 additions & 2 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
from graphlib import CycleError
import inspect
import typing
from typing import Callable, Awaitable, Any
Expand Down Expand Up @@ -27,7 +28,7 @@
RAW_ITEM,
CalculationResult,
)
from port_ocean.core.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.core.utils.utils import zip_and_sum, gather_and_split_errors_from_results
from port_ocean.exceptions.core import OceanAbortException

SEND_RAW_DATA_EXAMPLES_AMOUNT = 5
Expand Down Expand Up @@ -396,7 +397,20 @@ async def update_raw_diff(
{"before": entities_before_flatten, "after": entities_after_flatten},
user_agent_type,
)

async def sort_and_upsert_failed_entities(self,user_agent_type: UserAgentType)->None:
try:
if not event.entity_topological_sorter.should_execute():
return None
logger.info(f"Executings topological sort of {event.entity_topological_sorter.get_entities_count()} entities failed to upsert.",failed_toupsert_entities_count=event.entity_topological_sorter.get_entities_count())

for entity in event.entity_topological_sorter.get_entities():
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)

except OceanAbortException as ocean_abort:
logger.info(f"Failed topological sort of failed to upsert entites - trying to upsert unordered {event.entity_topological_sorter.get_entities_count()} entities.",failed_topological_sort_entities_count=event.entity_topological_sorter.get_entities_count() )
if isinstance(ocean_abort.__cause__,CycleError):
for entity in event.entity_topological_sorter.get_entities(False):
await self.entities_state_applier.context.port_client.upsert_entity(entity,event.port_app_config.get_port_request_options(),user_agent_type,should_raise=False)
async def sync_raw_all(
self,
_: dict[Any, Any] | None = None,
Expand Down Expand Up @@ -426,6 +440,7 @@ async def sync_raw_all(
use_cache=False
)
logger.info(f"Resync will use the following mappings: {app_config.dict()}")

try:
did_fetched_current_state = True
entities_at_port = await ocean.port_client.search_entities(
Expand Down Expand Up @@ -455,6 +470,8 @@ async def sync_raw_all(
event.on_abort(lambda: task.cancel())

creation_results.append(await task)

await self.sort_and_upsert_failed_entities(user_agent_type)
except asyncio.CancelledError as e:
logger.warning("Resync aborted successfully, skipping delete phase. This leads to an incomplete state")
raise
Expand Down
2 changes: 1 addition & 1 deletion port_ocean/core/integrations/mixins/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
RESYNC_EVENT_LISTENER,
RESYNC_RESULT,
)
from port_ocean.core.utils import validate_result
from port_ocean.core.utils.utils import validate_result
from port_ocean.exceptions.core import (
RawObjectValidationException,
OceanAbortException,
Expand Down
4 changes: 4 additions & 0 deletions port_ocean/core/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ def is_installation_type_compatible(self, installation_type: str) -> bool:
) or installation_type == self.value


class PortAPIErrorMessage(Enum):
NOT_FOUND = "not_found"


class Entity(BaseModel):
identifier: Any
blueprint: Any
Expand Down
Loading

0 comments on commit b5690b8

Please sign in to comment.