diff --git a/CHANGELOG.md b/CHANGELOG.md index 25e5a8353e..293b640a8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,17 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm +## 0.5.11 (2024-04-11) + + +### Improvements + +- Improved the handling of integration entities by adding retries and running it after the upsert to prevent blocking the resync + +### Features + +- Added a way to enable request retries for any request even if its request method is not part of the retryable methods + ## 0.5.10 (2024-04-10) diff --git a/port_ocean/clients/port/authentication.py b/port_ocean/clients/port/authentication.py index a7e7856900..60f9c170cb 100644 --- a/port_ocean/clients/port/authentication.py +++ b/port_ocean/clients/port/authentication.py @@ -49,7 +49,9 @@ async def _get_token(self, client_id: str, client_secret: str) -> TokenResponse: credentials = {"clientId": client_id, "clientSecret": client_secret} response = await self.client.post( - f"{self.api_url}/auth/access_token", json=credentials + f"{self.api_url}/auth/access_token", + json=credentials, + extensions={"retryable": True}, ) handle_status_code(response) return TokenResponse(**response.json()) diff --git a/port_ocean/clients/port/mixins/entities.py b/port_ocean/clients/port/mixins/entities.py index af406c41f5..36400367e8 100644 --- a/port_ocean/clients/port/mixins/entities.py +++ b/port_ocean/clients/port/mixins/entities.py @@ -133,21 +133,6 @@ async def batch_delete_entities( return_exceptions=True, ) - async def validate_entity_exist(self, identifier: str, blueprint: str) -> None: - logger.info(f"Validating entity {identifier} of blueprint {blueprint} exists") - - response = await self.client.get( - f"{self.auth.api_url}/blueprints/{blueprint}/entities/{identifier}", - headers=await self.auth.headers(), - ) - if response.is_error: - logger.error( - f"Error validating " - f"entity: {identifier} of " - f"blueprint: {blueprint}" - ) - handle_status_code(response) - async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]: query = { "combinator": "and", @@ -174,43 +159,7 @@ async def search_entities(self, user_agent_type: UserAgentType) -> list[Entity]: "exclude_calculated_properties": "true", "include": ["blueprint", "identifier"], }, + extensions={"retryable": True}, ) handle_status_code(response) return [Entity.parse_obj(result) for result in response.json()["entities"]] - - async def search_dependent_entities(self, entity: Entity) -> list[Entity]: - body = { - "combinator": "and", - "rules": [ - { - "operator": "relatedTo", - "blueprint": entity.blueprint, - "value": entity.identifier, - "direction": "downstream", - } - ], - } - - logger.info(f"Searching dependent entity with body {body}") - response = await self.client.post( - f"{self.auth.api_url}/entities/search", - headers=await self.auth.headers(), - json=body, - ) - handle_status_code(response) - - return [Entity.parse_obj(result) for result in response.json()["entities"]] - - async def validate_entity_payload( - self, entity: Entity, merge: bool, create_missing_related_entities: bool - ) -> None: - logger.info(f"Validating entity {entity.identifier}") - await self.upsert_entity( - entity, - { - "merge": merge, - "create_missing_related_entities": create_missing_related_entities, - "delete_dependent_entities": False, - "validation_only": True, - }, - ) diff --git a/port_ocean/clients/port/retry_transport.py b/port_ocean/clients/port/retry_transport.py index 7654585bc6..d2d8d7e2d7 100644 --- a/port_ocean/clients/port/retry_transport.py +++ b/port_ocean/clients/port/retry_transport.py @@ -15,11 +15,6 @@ def __init__(self, port_client: "PortClient", **kwargs: Any) -> None: super().__init__(**kwargs) self.port_client = port_client - def _is_retryable_method(self, request: httpx.Request) -> bool: - return super()._is_retryable_method(request) or request.url.path.endswith( - "/auth/access_token" - ) - async def _handle_unauthorized(self, response: httpx.Response) -> None: token = await self.port_client.auth.token response.headers["Authorization"] = f"Bearer {token}" diff --git a/port_ocean/core/handlers/entities_state_applier/port/applier.py b/port_ocean/core/handlers/entities_state_applier/port/applier.py index 8e27d60e47..ae5571ce6e 100644 --- a/port_ocean/core/handlers/entities_state_applier/port/applier.py +++ b/port_ocean/core/handlers/entities_state_applier/port/applier.py @@ -1,6 +1,3 @@ -import asyncio -from itertools import chain - from loguru import logger from port_ocean.clients.port.types import UserAgentType @@ -14,14 +11,9 @@ from port_ocean.core.handlers.entities_state_applier.port.order_by_entities_dependencies import ( order_by_entities_dependencies, ) -from port_ocean.core.handlers.entities_state_applier.port.validate_entity_relations import ( - validate_entity_relations, -) -from port_ocean.core.handlers.entity_processor.base import EntityPortDiff from port_ocean.core.models import Entity from port_ocean.core.ocean_types import EntityDiff -from port_ocean.core.utils import is_same_entity, get_unique, get_port_diff -from port_ocean.exceptions.core import RelationValidationException +from port_ocean.core.utils import is_same_entity, get_port_diff class HttpEntitiesStateApplier(BaseEntitiesStateApplier): @@ -32,63 +24,17 @@ class HttpEntitiesStateApplier(BaseEntitiesStateApplier): through HTTP requests. """ - async def _validate_delete_dependent_entities(self, entities: list[Entity]) -> None: - logger.info("Validated deleted entities") - if not event.port_app_config.delete_dependent_entities: - dependent_entities = await asyncio.gather( - *( - self.context.port_client.search_dependent_entities(entity) - for entity in entities - ) - ) - new_dependent_entities = get_unique( - [ - entity - for entity in chain.from_iterable(dependent_entities) - if not any(is_same_entity(item, entity) for item in entities) - ] - ) - - if new_dependent_entities: - raise RelationValidationException( - f"Must enable delete_dependent_entities flag or delete all dependent entities: " - f" {[(dep.blueprint, dep.identifier) for dep in new_dependent_entities]}" - ) - - async def _validate_entity_diff(self, diff: EntityPortDiff) -> None: - config = event.port_app_config - await self._validate_delete_dependent_entities(diff.deleted) - modified_or_created_entities = diff.modified + diff.created - - if modified_or_created_entities and not config.create_missing_related_entities: - logger.info("Validating modified or created entities") - - await asyncio.gather( - *( - self.context.port_client.validate_entity_payload( - entity, - config.enable_merge_entity, - create_missing_related_entities=config.create_missing_related_entities, - ) - for entity in modified_or_created_entities - ) - ) - - if not event.port_app_config.delete_dependent_entities: - logger.info("Validating no relation blocks the operation") - await validate_entity_relations(diff, self.context.port_client) - - async def _delete_diff( + async def _safe_delete( self, entities_to_delete: list[Entity], - created_entities: list[Entity], + entities_to_protect: list[Entity], user_agent_type: UserAgentType, ) -> None: if not entities_to_delete: return related_entities = await get_related_entities( - created_entities, self.context.port_client + entities_to_protect, self.context.port_client ) allowed_entities_to_delete = [] @@ -98,7 +44,8 @@ async def _delete_diff( is_same_entity(entity, entity_to_delete) for entity in related_entities ) is_part_of_created = any( - is_same_entity(entity, entity_to_delete) for entity in created_entities + is_same_entity(entity, entity_to_delete) + for entity in entities_to_protect ) if is_part_of_related: if event.port_app_config.create_missing_related_entities: @@ -119,21 +66,14 @@ async def apply_diff( user_agent_type: UserAgentType, ) -> None: diff = get_port_diff(entities["before"], entities["after"]) + kept_entities = diff.created + diff.modified logger.info( f"Updating entity diff (created: {len(diff.created)}, deleted: {len(diff.deleted)}, modified: {len(diff.modified)})" ) - await self._validate_entity_diff(diff) + await self.upsert(kept_entities, user_agent_type) - logger.info("Upserting new entities") - await self.upsert(diff.created, user_agent_type) - logger.info("Upserting modified entities") - await self.upsert(diff.modified, user_agent_type) - - logger.info("Deleting diff entities") - await self._delete_diff( - diff.deleted, diff.created + diff.modified, user_agent_type - ) + await self._safe_delete(diff.deleted, kept_entities, user_agent_type) async def delete_diff( self, @@ -145,15 +85,13 @@ async def delete_diff( if not diff.deleted: return + kept_entities = diff.created + diff.modified + logger.info( - f"Updating entity diff (created: {len(diff.created)}, deleted: {len(diff.deleted)}, modified: {len(diff.modified)})" + f"Determining entities to delete ({len(diff.deleted)}/{len(kept_entities)})" ) - await self._validate_entity_diff(diff) - logger.info("Deleting diff entities") - await self._delete_diff( - diff.deleted, diff.created + diff.modified, user_agent_type - ) + await self._safe_delete(diff.deleted, kept_entities, user_agent_type) async def upsert( self, entities: list[Entity], user_agent_type: UserAgentType diff --git a/port_ocean/core/handlers/entities_state_applier/port/validate_entity_relations.py b/port_ocean/core/handlers/entities_state_applier/port/validate_entity_relations.py deleted file mode 100644 index a500cf907b..0000000000 --- a/port_ocean/core/handlers/entities_state_applier/port/validate_entity_relations.py +++ /dev/null @@ -1,40 +0,0 @@ -import asyncio - -from port_ocean.clients.port.client import PortClient -from port_ocean.core.handlers.entities_state_applier.port.get_related_entities import ( - get_related_entities, -) -from port_ocean.core.handlers.entity_processor.base import EntityPortDiff -from port_ocean.core.utils import is_same_entity -from port_ocean.exceptions.core import RelationValidationException - - -async def validate_entity_relations( - diff: EntityPortDiff, port_client: PortClient -) -> None: - modified_or_created_entities = diff.modified + diff.created - related_entities = await get_related_entities( - modified_or_created_entities, port_client - ) - - required_entities = [] - - for entity in related_entities: - if any(is_same_entity(item, entity) for item in diff.deleted): - raise RelationValidationException( - f"Cant delete entity {entity} of blueprint {entity.blueprint} " - f"because it was specified as relation target of entity {entity} " - f"of blueprint {entity.blueprint}" - ) - - if not any( - is_same_entity(item, entity) for item in modified_or_created_entities - ): - required_entities.append(entity) - - await asyncio.gather( - *( - port_client.validate_entity_exist(item.identifier, item.blueprint) - for item in required_entities - ) - ) diff --git a/port_ocean/core/integrations/mixins/sync_raw.py b/port_ocean/core/integrations/mixins/sync_raw.py index 61c7c28b2d..67d60f0976 100644 --- a/port_ocean/core/integrations/mixins/sync_raw.py +++ b/port_ocean/core/integrations/mixins/sync_raw.py @@ -333,8 +333,6 @@ async def sync_raw_all( ): app_config = await self.port_app_config_handler.get_port_app_config() - entities_at_port = await ocean.port_client.search_entities(user_agent_type) - creation_results: list[tuple[list[Entity], list[Exception]]] = [] try: @@ -369,8 +367,11 @@ async def sync_raw_all( logger.error(message, exc_info=error_group) else: + entities_at_port = await ocean.port_client.search_entities( + user_agent_type + ) logger.info( - f"Running resync diff calculation, number of entities at Port before resync: {len(entities_at_port)}, number of entities created during sync: {len(flat_created_entities)}" + f"Running resync diff calculation, number of entities found at Port: {len(entities_at_port)}, number of entities found during sync: {len(flat_created_entities)}" ) await self.entities_state_applier.delete_diff( {"before": entities_at_port, "after": flat_created_entities}, diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index a7953b8a9a..98ecffff34 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -179,12 +179,35 @@ def close(self) -> None: transport.close() def _is_retryable_method(self, request: httpx.Request) -> bool: - return request.method in self._retryable_methods + return request.method in self._retryable_methods or request.extensions.get( + "retryable", False + ) def _should_retry(self, response: httpx.Response) -> bool: return response.status_code in self._retry_status_codes - def _log_failure( + def _log_error( + self, + request: httpx.Request, + error: Exception | None, + ) -> None: + if not self._logger: + return + + if isinstance(error, httpx.ConnectTimeout): + self._logger.error( + f"Request {request.method} {request.url} failed to connect: {str(error)}" + ) + elif isinstance(error, httpx.TimeoutException): + self._logger.error( + f"Request {request.method} {request.url} failed with a timeout exception: {str(error)}" + ) + elif isinstance(error, httpx.HTTPError): + self._logger.error( + f"Request {request.method} {request.url} failed with an HTTP error: {str(error)}" + ) + + def _log_before_retry( self, request: httpx.Request, sleep_time: float, @@ -249,7 +272,7 @@ async def _retry_operation_async( while True: if attempts_made > 0: sleep_time = self._calculate_sleep(attempts_made, {}) - self._log_failure(request, sleep_time, response, error) + self._log_before_retry(request, sleep_time, response, error) await asyncio.sleep(sleep_time) error = None @@ -262,9 +285,20 @@ async def _retry_operation_async( ): return response await response.aclose() + except httpx.ConnectTimeout as e: + error = e + if remaining_attempts < 1: + self._log_error(request, error) + raise + except httpx.TimeoutException as e: + error = e + if remaining_attempts < 1: + self._log_error(request, error) + raise except httpx.HTTPError as e: error = e if remaining_attempts < 1: + self._log_error(request, error) raise attempts_made += 1 remaining_attempts -= 1 @@ -281,7 +315,7 @@ def _retry_operation( while True: if attempts_made > 0: sleep_time = self._calculate_sleep(attempts_made, {}) - self._log_failure(request, sleep_time, response, error) + self._log_before_retry(request, sleep_time, response, error) time.sleep(sleep_time) error = None @@ -292,9 +326,20 @@ def _retry_operation( if remaining_attempts < 1 or not self._should_retry(response): return response response.close() + except httpx.ConnectTimeout as e: + error = e + if remaining_attempts < 1: + self._log_error(request, error) + raise + except httpx.TimeoutException as e: + error = e + if remaining_attempts < 1: + self._log_error(request, error) + raise except httpx.HTTPError as e: error = e if remaining_attempts < 1: + self._log_error(request, error) raise attempts_made += 1 remaining_attempts -= 1 diff --git a/pyproject.toml b/pyproject.toml index 931647ccbe..a4bf43de6a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "port-ocean" -version = "0.5.10" +version = "0.5.11" description = "Port Ocean is a CLI tool for managing your Port projects." readme = "README.md" homepage = "https://app.getport.io"