From 1a40313c436c09394c0e65f4fea5084a0cb52019 Mon Sep 17 00:00:00 2001 From: erikzaadi Date: Tue, 7 Jan 2025 09:35:33 +0200 Subject: [PATCH] [PR-FIX] Unify naming and add backoff retry mechanism --- .../clients/port/mixins/integrations.py | 41 ++++++++++++------- .../clients/port/mixins/organization.py | 8 ++-- port_ocean/core/defaults/initialize.py | 18 ++++---- port_ocean/exceptions/port_defaults.py | 10 +++++ 4 files changed, 51 insertions(+), 26 deletions(-) diff --git a/port_ocean/clients/port/mixins/integrations.py b/port_ocean/clients/port/mixins/integrations.py index 3ff99f08dc..e98387e2c2 100644 --- a/port_ocean/clients/port/mixins/integrations.py +++ b/port_ocean/clients/port/mixins/integrations.py @@ -6,12 +6,18 @@ from loguru import logger from port_ocean.clients.port.authentication import PortAuthentication from port_ocean.clients.port.utils import handle_status_code +from port_ocean.exceptions.defaults import DefaultsProvisionFailed from port_ocean.log.sensetive import sensitive_log_filter if TYPE_CHECKING: from port_ocean.core.handlers.port_app_config.models import PortAppConfig +INTEGRATION_POLLING_INTERVAL_INITIAL_SECONDS = 5 +INTEGRATION_POLLING_INTERVAL_BACKOFF_FACTOR = 1.5 +INTEGRATION_POLLING_RETRY_LIMIT = 30 + + class LogAttributes(TypedDict): ingestUrl: str @@ -53,23 +59,30 @@ async def get_log_attributes(self) -> LogAttributes: async def _poll_integration_until_default_provisioning_is_complete( self, - interval=15, ) -> Dict[str, Any]: - response = await self._get_current_integration() - response_json = response.json() - config = response_json.get("integration", {}).get("config", {}) - if config != {}: - return response_json + attempts = 0 + current_interval_seconds = INTEGRATION_POLLING_INTERVAL_INITIAL_SECONDS - logger.info( - f"Still waiting for integration config to be ready, waiting {interval} seconds" - ) - await asyncio.sleep(interval) + while attempts < INTEGRATION_POLLING_RETRY_LIMIT: + logger.info( + f"Fetching created integration and validating config, attempt {attempts+1}/{INTEGRATION_POLLING_RETRY_LIMIT}" + ) + response = await self._get_current_integration() + response_json = response.json() + config = response_json.get("integration", {}).get("config", {}) + if config != {}: + return response_json + + logger.info( + f"Integration config is still being provisioned, retrying in {current_interval_seconds} seconds" + ) + await asyncio.sleep(current_interval_seconds) - # TODO: Ensure that get_integration isn't cached - result = await self._poll_integration_until_default_provisioning_is_complete() + current_interval_seconds = ( + current_interval_seconds * INTEGRATION_POLLING_INTERVAL_BACKOFF_FACTOR + ) - return result + raise DefaultsProvisionFailed(INTEGRATION_POLLING_RETRY_LIMIT) async def create_integration( self, @@ -79,8 +92,6 @@ async def create_integration( use_provisioned_defaults: Optional[bool] = False, ) -> dict: logger.info(f"Creating integration with id: {self.integration_identifier}") - if use_provisioned_defaults: - logger.info("Creating integration with `use_provisioned_defaults`") headers = await self.auth.headers() json = { "installationId": self.integration_identifier, diff --git a/port_ocean/clients/port/mixins/organization.py b/port_ocean/clients/port/mixins/organization.py index b5cb8eb5fe..faf1b4fb03 100644 --- a/port_ocean/clients/port/mixins/organization.py +++ b/port_ocean/clients/port/mixins/organization.py @@ -14,8 +14,8 @@ def __init__( self.auth = auth self.client = client - async def _get_organization_feature_toggles(self) -> httpx.Response: - logger.info("Fetching organization feature toggles") + async def _get_organization_feature_flags(self) -> httpx.Response: + logger.info("Fetching organization feature flags") response = await self.client.get( f"{self.auth.api_url}/organization", @@ -23,9 +23,9 @@ async def _get_organization_feature_toggles(self) -> httpx.Response: ) return response - async def get_organization_feature_toggles( + async def get_organization_feature_flags( self, should_raise: bool = True, should_log: bool = True ) -> List[str]: - response = await self._get_organization_feature_toggles() + response = await self._get_organization_feature_flags() handle_status_code(response, should_raise, should_log) return response.json().get("organization", {}).get("featureFlags", []) diff --git a/port_ocean/core/defaults/initialize.py b/port_ocean/core/defaults/initialize.py index a257b65624..428fcdc15a 100644 --- a/port_ocean/core/defaults/initialize.py +++ b/port_ocean/core/defaults/initialize.py @@ -19,7 +19,7 @@ AbortDefaultCreationError, ) -ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE = "USE_PROVISIONED_DEFAULTS" +ORG_USE_PROVISIONED_DEFAULTS_FEATURE_FLAG = "USE_PROVISIONED_DEFAULTS" def deconstruct_blueprints_to_creation_steps( @@ -204,17 +204,21 @@ async def _initialize_defaults( defaults = get_port_integration_defaults( config_class, integration_config.resources_path ) - if not defaults: - logger.warning("No defaults found. Skipping initialization...") - return None if integration_config.use_provisioned_defaults: logger.info("`use_provisioned_defaults` set, verifying org feature toggle") - org_feature_toggles = await port_client.get_organization_feature_toggles() - if ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE not in org_feature_toggles: + org_feature_flags = await port_client.get_organization_feature_flags() + if ORG_USE_PROVISIONED_DEFAULTS_FEATURE_FLAG not in org_feature_flags: + logger.info( + "Although `use_provisioned_defaults` was set, it was not enabled in the organizations feature flags, disabling" + ) integration_config.use_provisioned_defaults = False - if defaults.port_app_config: + if not integration_config.use_provisioned_defaults and not defaults: + logger.warning("No defaults found. Skipping initialization...") + return None + + if defaults.port_app_config or integration_config.use_provisioned_defaults: await _initialize_required_integration_settings( port_client, defaults.port_app_config, integration_config ) diff --git a/port_ocean/exceptions/port_defaults.py b/port_ocean/exceptions/port_defaults.py index 38ebe3c612..35f93b858d 100644 --- a/port_ocean/exceptions/port_defaults.py +++ b/port_ocean/exceptions/port_defaults.py @@ -14,3 +14,13 @@ def __init__( class UnsupportedDefaultFileType(BaseOceanException): pass + + +class DefaultsProvisionFailed(BaseOceanException): + def __init__( + self, + retries, + ): + super().__init__( + f"Failed to retrieve integration config after {retries} attempts" + )