From ab1b54a616ee36a1abd8963bac67d6cb60318a63 Mon Sep 17 00:00:00 2001 From: Shalev Avhar <51760613+shalev007@users.noreply.github.com> Date: Mon, 25 Nov 2024 12:05:24 +0200 Subject: [PATCH] [Core] Support the reduction of Port rate limit in the integrations (#1155) # Description What - I updated the retry mechanism to ensure the total retry time is capped at 5 minutes, with a maximum backoff time of 5 minutes between retries. Why - This change is needed to handle the reduced port rate limit in integrations more effectively, minimizing the risk of overwhelming the system while still ensuring retries are reasonable. How - I adjusted the retry logic to distribute retries over the 5-minute window, scaling backoff times to align with the reduced rate limit without exceeding the time limit. ## Type of change Please leave one option from the following and delete the rest: - [ ] Bug fix (non-breaking change which fixes an issue) - [ ] New feature (non-breaking change which adds functionality) - [ ] New Integration (non-breaking change which adds a new integration) - [ ] Breaking change (fix or feature that would cause existing functionality to not work as expected) - [X] Non-breaking change (fix of existing functionality that will not change current behavior) - [ ] Documentation (added/updated documentation)

All tests should be run against the port production environment(using a testing org).

### Core testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync finishes successfully - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Scheduled resync able to abort existing resync and start a new one - [ ] Tested with at least 2 integrations from scratch - [ ] Tested with Kafka and Polling event listeners - [ ] Tested deletion of entities that don't pass the selector ### Integration testing checklist - [ ] Integration able to create all default resources from scratch - [ ] Resync able to create entities - [ ] Resync able to update entities - [ ] Resync able to detect and delete entities - [ ] Resync finishes successfully - [ ] If new resource kind is added or updated in the integration, add example raw data, mapping and expected result to the `examples` folder in the integration directory. - [ ] If resource kind is updated, run the integration with the example data and check if the expected result is achieved - [ ] If new resource kind is added or updated, validate that live-events for that resource are working as expected - [ ] Docs PR link [here](#) ### Preflight checklist - [ ] Handled rate limiting - [ ] Handled pagination - [ ] Implemented the code in async - [ ] Support Multi account ## Screenshots Include screenshots from your environment showing how the resources of the integration will look. ## API Documentation Provide links to the API documentation used for this integration. --------- Co-authored-by: Shalev Avhar --- port_ocean/clients/port/utils.py | 10 ++++++++-- port_ocean/helpers/retry.py | 12 ++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/port_ocean/clients/port/utils.py b/port_ocean/clients/port/utils.py index a2c3782c84..70917c55b9 100644 --- a/port_ocean/clients/port/utils.py +++ b/port_ocean/clients/port/utils.py @@ -16,7 +16,7 @@ # period of time, before raising an exception. # The max_connections value can't be too high, as it will cause the application to run out of memory. # The max_keepalive_connections can't be too high, as it will cause the application to run out of available connections. -PORT_HTTP_MAX_CONNECTIONS_LIMIT = 200 +PORT_HTTP_MAX_CONNECTIONS_LIMIT = 100 PORT_HTTP_MAX_KEEP_ALIVE_CONNECTIONS = 50 PORT_HTTP_TIMEOUT = 60.0 @@ -28,13 +28,19 @@ _http_client: LocalStack[httpx.AsyncClient] = LocalStack() +FIVE_MINUETS = 60 * 5 + def _get_http_client_context(port_client: "PortClient") -> httpx.AsyncClient: client = _http_client.top if client is None: client = OceanAsyncClient( TokenRetryTransport, - transport_kwargs={"port_client": port_client}, + transport_kwargs={ + "port_client": port_client, + "max_backoff_wait": FIVE_MINUETS, + "base_delay": 0.3, + }, timeout=PORT_HTTPX_TIMEOUT, limits=PORT_HTTPX_LIMITS, ) diff --git a/port_ocean/helpers/retry.py b/port_ocean/helpers/retry.py index 077c6e0ffb..9585c9439c 100644 --- a/port_ocean/helpers/retry.py +++ b/port_ocean/helpers/retry.py @@ -55,14 +55,14 @@ class RetryTransport(httpx.AsyncBaseTransport, httpx.BaseTransport): HTTPStatus.GATEWAY_TIMEOUT, ] ) - MAX_BACKOFF_WAIT = 60 + MAX_BACKOFF_WAIT_IN_SECONDS = 60 def __init__( self, wrapped_transport: Union[httpx.BaseTransport, httpx.AsyncBaseTransport], max_attempts: int = 10, - max_backoff_wait: float = MAX_BACKOFF_WAIT, - backoff_factor: float = 0.1, + max_backoff_wait: float = MAX_BACKOFF_WAIT_IN_SECONDS, + base_delay: float = 0.1, jitter_ratio: float = 0.1, respect_retry_after_header: bool = True, retryable_methods: Iterable[str] | None = None, @@ -81,7 +81,7 @@ def __init__( max_backoff_wait (float, optional): The maximum amount of time (in seconds) to wait before retrying a request. Defaults to 60. - backoff_factor (float, optional): + base_delay (float, optional): The factor by which the waiting time will be multiplied in each retry attempt. Defaults to 0.1. jitter_ratio (float, optional): @@ -105,7 +105,7 @@ def __init__( ) self._max_attempts = max_attempts - self._backoff_factor = backoff_factor + self._base_delay = base_delay self._respect_retry_after_header = respect_retry_after_header self._retryable_methods = ( frozenset(retryable_methods) @@ -255,7 +255,7 @@ def _calculate_sleep( except ValueError: pass - backoff = self._backoff_factor * (2 ** (attempts_made - 1)) + backoff = self._base_delay * (2 ** (attempts_made - 1)) jitter = (backoff * self._jitter_ratio) * random.choice([1, -1]) total_backoff = backoff + jitter return min(total_backoff, self._max_backoff_wait)