Skip to content

Commit

Permalink
[PR-FIX] Unify naming and add backoff retry mechanism
Browse files Browse the repository at this point in the history
  • Loading branch information
erikzaadi committed Jan 7, 2025
1 parent 78cf234 commit f3d696d
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 26 deletions.
41 changes: 26 additions & 15 deletions port_ocean/clients/port/mixins/integrations.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,18 @@
from loguru import logger
from port_ocean.clients.port.authentication import PortAuthentication
from port_ocean.clients.port.utils import handle_status_code
from port_ocean.exceptions.defaults import DefaultsProvisionFailed
from port_ocean.log.sensetive import sensitive_log_filter

if TYPE_CHECKING:
from port_ocean.core.handlers.port_app_config.models import PortAppConfig


INTEGRATION_POLLING_INTERVAL_INITIAL_SECONDS = 5
INTEGRATION_POLLING_INTERVAL_BACKOFF_FACTOR = 1.5
INTEGRATION_POLLING_RETRY_LIMIT = 30


class LogAttributes(TypedDict):
ingestUrl: str

Expand Down Expand Up @@ -53,23 +59,30 @@ async def get_log_attributes(self) -> LogAttributes:

async def _poll_integration_until_default_provisioning_is_complete(
self,
interval=15,
) -> Dict[str, Any]:
response = await self._get_current_integration()
response_json = response.json()
config = response_json.get("integration", {}).get("config", {})
if config != {}:
return response_json
attempts = 0
current_interval_seconds = INTEGRATION_POLLING_INTERVAL_INITIAL_SECONDS

logger.info(
f"Still waiting for integration config to be ready, waiting {interval} seconds"
)
await asyncio.sleep(interval)
while attempts < INTEGRATION_POLLING_RETRY_LIMIT:
logger.info(
f"Fetching created integration and validating config, attempt {attempts+1}/{INTEGRATION_POLLING_RETRY_LIMIT}"
)
response = await self._get_current_integration()
response_json = response.json()
config = response_json.get("integration", {}).get("config", {})
if config != {}:
return response_json

logger.info(
f"Integration config is still being provisioned, retrying in {current_interval_seconds} seconds"
)
await asyncio.sleep(current_interval_seconds)

# TODO: Ensure that get_integration isn't cached
result = await self._poll_integration_until_default_provisioning_is_complete()
current_interval_seconds = (
current_interval_seconds * INTEGRATION_POLLING_INTERVAL_BACKOFF_FACTOR
)

return result
raise DefaultsProvisionFailed(INTEGRATION_POLLING_RETRY_LIMIT)

async def create_integration(
self,
Expand All @@ -79,8 +92,6 @@ async def create_integration(
use_provisioned_defaults: Optional[bool] = False,
) -> dict:
logger.info(f"Creating integration with id: {self.integration_identifier}")
if use_provisioned_defaults:
logger.info("Creating integration with `use_provisioned_defaults`")
headers = await self.auth.headers()
json = {
"installationId": self.integration_identifier,
Expand Down
8 changes: 4 additions & 4 deletions port_ocean/clients/port/mixins/organization.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,18 @@ def __init__(
self.auth = auth
self.client = client

async def _get_organization_feature_toggles(self) -> httpx.Response:
logger.info("Fetching organization feature toggles")
async def _get_organization_feature_flags(self) -> httpx.Response:
logger.info("Fetching organization feature flags")

response = await self.client.get(
f"{self.auth.api_url}/organization",
headers=await self.auth.headers(),
)
return response

async def get_organization_feature_toggles(
async def get_organization_feature_flags(
self, should_raise: bool = True, should_log: bool = True
) -> List[str]:
response = await self._get_organization_feature_toggles()
response = await self._get_organization_feature_flags()
handle_status_code(response, should_raise, should_log)
return response.json().get("organization", {}).get("featureFlags", [])
18 changes: 11 additions & 7 deletions port_ocean/core/defaults/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
AbortDefaultCreationError,
)

ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE = "USE_PROVISIONED_DEFAULTS"
ORG_USE_PROVISIONED_DEFAULTS_FEATURE_FLAG = "USE_PROVISIONED_DEFAULTS"


def deconstruct_blueprints_to_creation_steps(
Expand Down Expand Up @@ -204,17 +204,21 @@ async def _initialize_defaults(
defaults = get_port_integration_defaults(
config_class, integration_config.resources_path
)
if not defaults:
logger.warning("No defaults found. Skipping initialization...")
return None

if integration_config.use_provisioned_defaults:
logger.info("`use_provisioned_defaults` set, verifying org feature toggle")
org_feature_toggles = await port_client.get_organization_feature_toggles()
if ORG_USE_PROVISIONED_DEFAULTS_FEATURE_TOGGLE not in org_feature_toggles:
org_feature_flags = await port_client.get_organization_feature_flags()
if ORG_USE_PROVISIONED_DEFAULTS_FEATURE_FLAG not in org_feature_flags:
logger.info(
"Although `use_provisioned_defaults` was set, it was not enabled in the organizations feature flags, disabling"
)
integration_config.use_provisioned_defaults = False

if defaults.port_app_config:
if not integration_config.use_provisioned_defaults and not defaults:
logger.warning("No defaults found. Skipping initialization...")
return None

if defaults.port_app_config or integration_config.use_provisioned_defaults:
await _initialize_required_integration_settings(
port_client, defaults.port_app_config, integration_config
)
Expand Down
10 changes: 10 additions & 0 deletions port_ocean/exceptions/port_defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,13 @@ def __init__(

class UnsupportedDefaultFileType(BaseOceanException):
pass


class DefaultsProvisionFailed(BaseOceanException):
def __init__(
self,
retries,
):
super().__init__(
f"Failed to retrieve integration config after {retries} attempts"
)

0 comments on commit f3d696d

Please sign in to comment.