Skip to content

Commit

Permalink
Port 7627 ocean retry integration entities search (#498)
Browse files Browse the repository at this point in the history
# Description

What - Some times entities search would fail and cause the whole resync
to abort
Why - Request timeout & No retries for post request
How - Moved the entities search to the end of the resync & Added a way
to enable retries for any request.

## Type of change

- [X] New feature (non-breaking change which adds functionality)
  • Loading branch information
yairsimantov20 authored Apr 11, 2024
1 parent f122eb5 commit 26e1439
Show file tree
Hide file tree
Showing 9 changed files with 82 additions and 181 deletions.
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.5.11 (2024-04-11)


### Improvements

- Improved the handling of integration entities by adding retries and running it after the upsert to prevent blocking the resync

### Features

- Added a way to enable request retries for any request even if its request method is not part of the retryable methods


## 0.5.10 (2024-04-10)

Expand Down
4 changes: 3 additions & 1 deletion port_ocean/clients/port/authentication.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ async def _get_token(self, client_id: str, client_secret: str) -> TokenResponse:

credentials = {"clientId": client_id, "clientSecret": client_secret}
response = await self.client.post(
f"{self.api_url}/auth/access_token", json=credentials
f"{self.api_url}/auth/access_token",
json=credentials,
extensions={"retryable": True},
)
handle_status_code(response)
return TokenResponse(**response.json())
Expand Down
53 changes: 1 addition & 52 deletions port_ocean/clients/port/mixins/entities.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,21 +133,6 @@ async def batch_delete_entities(
return_exceptions=True,
)

async def validate_entity_exist(self, identifier: str, blueprint: str) -> None:
logger.info(f"Validating entity {identifier} of blueprint {blueprint} exists")

response = await self.client.get(
f"{self.auth.api_url}/blueprints/{blueprint}/entities/{identifier}",
headers=await self.auth.headers(),
)
if response.is_error:
logger.error(
f"Error validating "
f"entity: {identifier} of "
f"blueprint: {blueprint}"
)
handle_status_code(response)

async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]:
query = {
"combinator": "and",
Expand All @@ -174,43 +159,7 @@ async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]:
"exclude_calculated_properties": "true",
"include": ["blueprint", "identifier"],
},
extensions={"retryable": True},
)
handle_status_code(response)
return [Entity.parse_obj(result) for result in response.json()["entities"]]

async def search_dependent_entities(self, entity: Entity) -> list[Entity]:
body = {
"combinator": "and",
"rules": [
{
"operator": "relatedTo",
"blueprint": entity.blueprint,
"value": entity.identifier,
"direction": "downstream",
}
],
}

logger.info(f"Searching dependent entity with body {body}")
response = await self.client.post(
f"{self.auth.api_url}/entities/search",
headers=await self.auth.headers(),
json=body,
)
handle_status_code(response)

return [Entity.parse_obj(result) for result in response.json()["entities"]]

async def validate_entity_payload(
self, entity: Entity, merge: bool, create_missing_related_entities: bool
) -> None:
logger.info(f"Validating entity {entity.identifier}")
await self.upsert_entity(
entity,
{
"merge": merge,
"create_missing_related_entities": create_missing_related_entities,
"delete_dependent_entities": False,
"validation_only": True,
},
)
5 changes: 0 additions & 5 deletions port_ocean/clients/port/retry_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,6 @@ def __init__(self, port_client: "PortClient", **kwargs: Any) -> None:
super().__init__(**kwargs)
self.port_client = port_client

def _is_retryable_method(self, request: httpx.Request) -> bool:
return super()._is_retryable_method(request) or request.url.path.endswith(
"/auth/access_token"
)

async def _handle_unauthorized(self, response: httpx.Response) -> None:
token = await self.port_client.auth.token
response.headers["Authorization"] = f"Bearer {token}"
Expand Down
88 changes: 13 additions & 75 deletions port_ocean/core/handlers/entities_state_applier/port/applier.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
import asyncio
from itertools import chain

from loguru import logger

from port_ocean.clients.port.types import UserAgentType
Expand All @@ -14,14 +11,9 @@
from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import (
order_by_entities_dependencies,
)
from port_ocean.core.handlers.entities_state_applier.port.validate_entity_relations import (
validate_entity_relations,
)
from port_ocean.core.handlers.entity_processor.base import EntityPortDiff
from port_ocean.core.models import Entity
from port_ocean.core.ocean_types import EntityDiff
from port_ocean.core.utils import is_same_entity, get_unique, get_port_diff
from port_ocean.exceptions.core import RelationValidationException
from port_ocean.core.utils import is_same_entity, get_port_diff


class HttpEntitiesStateApplier(BaseEntitiesStateApplier):
Expand All @@ -32,63 +24,17 @@ class HttpEntitiesStateApplier(BaseEntitiesStateApplier):
through HTTP requests.
"""

async def _validate_delete_dependent_entities(self, entities: list[Entity]) -> None:
logger.info("Validated deleted entities")
if not event.port_app_config.delete_dependent_entities:
dependent_entities = await asyncio.gather(
*(
self.context.port_client.search_dependent_entities(entity)
for entity in entities
)
)
new_dependent_entities = get_unique(
[
entity
for entity in chain.from_iterable(dependent_entities)
if not any(is_same_entity(item, entity) for item in entities)
]
)

if new_dependent_entities:
raise RelationValidationException(
f"Must enable delete_dependent_entities flag or delete all dependent entities: "
f" {[(dep.blueprint, dep.identifier) for dep in new_dependent_entities]}"
)

async def _validate_entity_diff(self, diff: EntityPortDiff) -> None:
config = event.port_app_config
await self._validate_delete_dependent_entities(diff.deleted)
modified_or_created_entities = diff.modified + diff.created

if modified_or_created_entities and not config.create_missing_related_entities:
logger.info("Validating modified or created entities")

await asyncio.gather(
*(
self.context.port_client.validate_entity_payload(
entity,
config.enable_merge_entity,
create_missing_related_entities=config.create_missing_related_entities,
)
for entity in modified_or_created_entities
)
)

if not event.port_app_config.delete_dependent_entities:
logger.info("Validating no relation blocks the operation")
await validate_entity_relations(diff, self.context.port_client)

async def _delete_diff(
async def _safe_delete(
self,
entities_to_delete: list[Entity],
created_entities: list[Entity],
entities_to_protect: list[Entity],
user_agent_type: UserAgentType,
) -> None:
if not entities_to_delete:
return

related_entities = await get_related_entities(
created_entities, self.context.port_client
entities_to_protect, self.context.port_client
)

allowed_entities_to_delete = []
Expand All @@ -98,7 +44,8 @@ async def _delete_diff(
is_same_entity(entity, entity_to_delete) for entity in related_entities
)
is_part_of_created = any(
is_same_entity(entity, entity_to_delete) for entity in created_entities
is_same_entity(entity, entity_to_delete)
for entity in entities_to_protect
)
if is_part_of_related:
if event.port_app_config.create_missing_related_entities:
Expand All @@ -119,21 +66,14 @@ async def apply_diff(
user_agent_type: UserAgentType,
) -> None:
diff = get_port_diff(entities["before"], entities["after"])
kept_entities = diff.created + diff.modified

logger.info(
f"Updating entity diff (created: {len(diff.created)}, deleted: {len(diff.deleted)}, modified: {len(diff.modified)})"
)
await self._validate_entity_diff(diff)
await self.upsert(kept_entities, user_agent_type)

logger.info("Upserting new entities")
await self.upsert(diff.created, user_agent_type)
logger.info("Upserting modified entities")
await self.upsert(diff.modified, user_agent_type)

logger.info("Deleting diff entities")
await self._delete_diff(
diff.deleted, diff.created + diff.modified, user_agent_type
)
await self._safe_delete(diff.deleted, kept_entities, user_agent_type)

async def delete_diff(
self,
Expand All @@ -145,15 +85,13 @@ async def delete_diff(
if not diff.deleted:
return

kept_entities = diff.created + diff.modified

logger.info(
f"Updating entity diff (created: {len(diff.created)}, deleted: {len(diff.deleted)}, modified: {len(diff.modified)})"
f"Determining entities to delete ({len(diff.deleted)}/{len(kept_entities)})"
)
await self._validate_entity_diff(diff)

logger.info("Deleting diff entities")
await self._delete_diff(
diff.deleted, diff.created + diff.modified, user_agent_type
)
await self._safe_delete(diff.deleted, kept_entities, user_agent_type)

async def upsert(
self, entities: list[Entity], user_agent_type: UserAgentType
Expand Down

This file was deleted.

7 changes: 4 additions & 3 deletions port_ocean/core/integrations/mixins/sync_raw.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,6 @@ async def sync_raw_all(
):
app_config = await self.port_app_config_handler.get_port_app_config()

entities_at_port = await ocean.port_client.search_entities(user_agent_type)

creation_results: list[tuple[list[Entity], list[Exception]]] = []

try:
Expand Down Expand Up @@ -369,8 +367,11 @@ async def sync_raw_all(

logger.error(message, exc_info=error_group)
else:
entities_at_port = await ocean.port_client.search_entities(
user_agent_type
)
logger.info(
f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}"
f"Running resync diff calculation, number of entities found at Port: {len(entities_at_port)}, number of entities found during sync: {len(flat_created_entities)}"
)
await self.entities_state_applier.delete_diff(
{"before": entities_at_port, "after": flat_created_entities},
Expand Down
Loading

0 comments on commit 26e1439

Please sign in to comment.