From 0b52b8f3a00dbf09a58b733ccacad3b289bf9f7a Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 30 Oct 2024 22:06:03 +0000 Subject: [PATCH 01/28] [terraform-cloud] base rate limiting with aiolimiter --- integrations/terraform-cloud/client.py | 81 +++++++++++++++------ integrations/terraform-cloud/main.py | 66 ++++++++--------- integrations/terraform-cloud/poetry.lock | 15 +++- integrations/terraform-cloud/pyproject.toml | 1 + 4 files changed, 102 insertions(+), 61 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 7c50a41894..c034fb9eed 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -3,6 +3,9 @@ import httpx from loguru import logger from enum import StrEnum +from aiolimiter import AsyncLimiter +import time +import asyncio from port_ocean.context.event import event @@ -22,6 +25,9 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 +# Terraform's rate limit is 30 requests per second +RATE_LIMIT = 30 + class TerraformClient: def __init__(self, terraform_base_url: str, auth_token: str) -> None: @@ -33,6 +39,11 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.api_url = f"{self.terraform_base_url}/api/v2" self.client = http_async_client self.client.headers.update(self.base_headers) + + # Initialize rate limiter for 30 requests per second + self.rate_limiter = AsyncLimiter(30, 1) # 30 requests per 1 second + self._remaining = RATE_LIMIT + self._reset_time = None async def send_api_request( self, @@ -41,32 +52,54 @@ async def send_api_request( query_params: Optional[dict[str, Any]] = None, json_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: - logger.info(f"Requesting Terraform Cloud data for endpoint: {endpoint}") - try: - url = f"{self.api_url}/{endpoint}" - logger.info( - f"URL: {url}, Method: {method}, Params: {query_params}, Body: {json_data}" - ) - response = await self.client.request( - method=method, - url=url, - params=query_params, - json=json_data, - ) - response.raise_for_status() - - logger.info(f"Successfully retrieved data for endpoint: {endpoint}") + async with self.rate_limiter: + try: + url = f"{self.api_url}/{endpoint}" + response = await self.client.request( + method=method, + url=url, + params=query_params, + json=json_data, + ) - return response.json() + # Update rate limit info from headers + self._remaining = int(response.headers.get('x-ratelimit-remaining', RATE_LIMIT)) + reset_in = float(response.headers.get('x-ratelimit-reset', 1)) # Default to 1 second + self._reset_time = time.time() + reset_in + + logger.info( + f"Rate limit info - " + f"Limit: {response.headers.get('x-ratelimit-limit', RATE_LIMIT)}, " + f"Remaining: {self._remaining}, " + f"Reset: {reset_in}" + ) - except httpx.HTTPStatusError as e: - logger.error( - f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}" - ) - raise - except httpx.HTTPError as e: - logger.error(f"HTTP error on {endpoint}: {str(e)}") - raise + if response.status_code == 429: + logger.warning( + f"Rate limited on {endpoint}. " + f"Headers: {dict(response.headers)}. " + f"Waiting {reset_in} seconds" + ) + await asyncio.sleep(reset_in) + return await self.send_api_request(endpoint, method, query_params, json_data) + + response.raise_for_status() + return response.json() + + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + reset_in = float(e.response.headers.get('x-ratelimit-reset', 1)) + logger.warning( + f"Rate limited on {endpoint}. " + f"Headers: {dict(e.response.headers)}. " + f"Waiting {reset_in} seconds" + ) + await asyncio.sleep(reset_in) + return await self.send_api_request(endpoint, method, query_params, json_data) + logger.error( + f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}" + ) + raise async def get_paginated_resources( self, endpoint: str, params: Optional[dict[str, Any]] = None diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 406582ace7..133cf7c4e4 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -19,7 +19,6 @@ class ObjectKind(StrEnum): SKIP_WEBHOOK_CREATION = False -SEMAPHORE_LIMIT = 30 def init_terraform_client() -> TerraformClient: @@ -40,40 +39,38 @@ def init_terraform_client() -> TerraformClient: async def enrich_state_versions_with_output_data( http_client: TerraformClient, state_versions: List[dict[str, Any]] ) -> list[dict[str, Any]]: - async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT): - tasks = [ - http_client.get_state_version_output(state_version["id"]) - for state_version in state_versions - ] + tasks = [ + http_client.get_state_version_output(state_version["id"]) + for state_version in state_versions + ] - output_batches = [] - for completed_task in asyncio.as_completed(tasks): - output = await completed_task - output_batches.append(output) + output_batches = [] + for completed_task in asyncio.as_completed(tasks): + output = await completed_task + output_batches.append(output) - enriched_state_versions = [ - {**state_version, "__output": output} - for state_version, output in zip(state_versions, output_batches) - ] + enriched_state_versions = [ + {**state_version, "__output": output} + for state_version, output in zip(state_versions, output_batches) + ] - return enriched_state_versions + return enriched_state_versions async def enrich_workspaces_with_tags( http_client: TerraformClient, workspaces: List[dict[str, Any]] ) -> list[dict[str, Any]]: async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]: - async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT): - try: - tags = [] - async for tag_batch in http_client.get_workspace_tags(workspace["id"]): - tags.extend(tag_batch) - return {**workspace, "__tags": tags} - except Exception as e: - logger.warning( - f"Failed to fetch tags for workspace {workspace['id']}: {e}" - ) - return {**workspace, "__tags": []} + try: + tags = [] + async for tag_batch in http_client.get_workspace_tags(workspace["id"]): + tags.extend(tag_batch) + return {**workspace, "__tags": tags} + except Exception as e: + logger.warning( + f"Failed to fetch tags for workspace {workspace['id']}: {e}" + ) + return {**workspace, "__tags": []} tasks = [get_tags_for_workspace(workspace) for workspace in workspaces] enriched_workspaces = [await task for task in asyncio.as_completed(tasks)] @@ -84,15 +81,14 @@ async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]: async def enrich_workspace_with_tags( http_client: TerraformClient, workspace: dict[str, Any] ) -> dict[str, Any]: - async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT): - try: - tags = [] - async for tag_batch in http_client.get_workspace_tags(workspace["id"]): - tags.extend(tag_batch) - return {**workspace, "__tags": tags} - except Exception as e: - logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}") - return {**workspace, "__tags": []} + try: + tags = [] + async for tag_batch in http_client.get_workspace_tags(workspace["id"]): + tags.extend(tag_batch) + return {**workspace, "__tags": tags} + except Exception as e: + logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}") + return {**workspace, "__tags": []} @ocean.on_resync(ObjectKind.ORGANIZATION) diff --git a/integrations/terraform-cloud/poetry.lock b/integrations/terraform-cloud/poetry.lock index 372ae8c178..7670203d8d 100644 --- a/integrations/terraform-cloud/poetry.lock +++ b/integrations/terraform-cloud/poetry.lock @@ -1,4 +1,15 @@ -# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. + +[[package]] +name = "aiolimiter" +version = "1.1.0" +description = "asyncio rate limiter, a leaky bucket implementation" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "aiolimiter-1.1.0-py3-none-any.whl", hash = "sha256:0b4997961fc58b8df40279e739f9cf0d3e255e63e9a44f64df567a8c17241e24"}, + {file = "aiolimiter-1.1.0.tar.gz", hash = "sha256:461cf02f82a29347340d031626c92853645c099cb5ff85577b831a7bd21132b5"}, +] [[package]] name = "aiostream" @@ -2022,4 +2033,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "7ea018ada575f3880e54e5e9e7ec7c034e0bf384ab4298eccf905b589dc23778" +content-hash = "92c080ff783c404a5cece85b54cd0b6ca8c06f46d9bf194f0966c20d5d8a6f73" diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index be83307d8e..e9e232184e 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -7,6 +7,7 @@ authors = ["Michael Armah "] [tool.poetry.dependencies] python = "^3.11" port_ocean = {version = "^0.12.7", extras = ["cli"]} +aiolimiter = "^1.1.0" [tool.poetry.group.dev.dependencies] # uncomment this if you want to debug the ocean core together with your integration From a00b0937a7640765ae482f0c585caffab000eaa8 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 30 Oct 2024 22:35:30 +0000 Subject: [PATCH 02/28] update the var names --- integrations/terraform-cloud/client.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index c034fb9eed..fbfdce43be 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -26,7 +26,8 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 # Terraform's rate limit is 30 requests per second -RATE_LIMIT = 30 +# We're using a buffer of 5 requests to account for any variability in request times +TERRAFORM_RATE_LIMIT = 25 class TerraformClient: @@ -40,9 +41,8 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.client = http_async_client self.client.headers.update(self.base_headers) - # Initialize rate limiter for 30 requests per second - self.rate_limiter = AsyncLimiter(30, 1) # 30 requests per 1 second - self._remaining = RATE_LIMIT + self.rate_limiter = AsyncLimiter(TERRAFORM_RATE_LIMIT, 1) + self._remaining = TERRAFORM_RATE_LIMIT self._reset_time = None async def send_api_request( @@ -62,14 +62,13 @@ async def send_api_request( json=json_data, ) - # Update rate limit info from headers - self._remaining = int(response.headers.get('x-ratelimit-remaining', RATE_LIMIT)) + self._remaining = int(response.headers.get('x-ratelimit-remaining', TERRAFORM_RATE_LIMIT)) reset_in = float(response.headers.get('x-ratelimit-reset', 1)) # Default to 1 second self._reset_time = time.time() + reset_in logger.info( f"Rate limit info - " - f"Limit: {response.headers.get('x-ratelimit-limit', RATE_LIMIT)}, " + f"Limit: {response.headers.get('x-ratelimit-limit', TERRAFORM_RATE_LIMIT)}, " f"Remaining: {self._remaining}, " f"Reset: {reset_in}" ) From 977420b67493069a1c8506eac112c28bfd779af2 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 30 Oct 2024 23:15:04 +0000 Subject: [PATCH 03/28] user process_in_queue to control concurrent operations at the application level --- integrations/terraform-cloud/main.py | 87 ++++++++++++++++------------ 1 file changed, 49 insertions(+), 38 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 133cf7c4e4..cba4363521 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -8,6 +8,7 @@ from client import TerraformClient from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_RESULT +from port_ocean.utils.queue_utils import process_in_queue class ObjectKind(StrEnum): @@ -20,6 +21,9 @@ class ObjectKind(StrEnum): SKIP_WEBHOOK_CREATION = False +# Constants for rate limiting +MAX_CONCURRENCY = 25 # Keep slightly under the 30/sec limit to allow for overhead + def init_terraform_client() -> TerraformClient: """ @@ -39,22 +43,24 @@ def init_terraform_client() -> TerraformClient: async def enrich_state_versions_with_output_data( http_client: TerraformClient, state_versions: List[dict[str, Any]] ) -> list[dict[str, Any]]: - tasks = [ - http_client.get_state_version_output(state_version["id"]) - for state_version in state_versions - ] - - output_batches = [] - for completed_task in asyncio.as_completed(tasks): - output = await completed_task - output_batches.append(output) - - enriched_state_versions = [ - {**state_version, "__output": output} - for state_version, output in zip(state_versions, output_batches) - ] + async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]: + try: + output = await http_client.get_state_version_output(state_version["id"]) + return {**state_version, "__output": output} + except Exception as e: + logger.warning( + f"Failed to fetch output for state version {state_version['id']}: {e}" + ) + return {**state_version, "__output": {}} - return enriched_state_versions + # We can process many more items concurrently with 30 req/sec limit + enriched_versions = await process_in_queue( + state_versions, + fetch_output, + concurrency=MAX_CONCURRENCY + ) + + return enriched_versions async def enrich_workspaces_with_tags( @@ -72,9 +78,12 @@ async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]: ) return {**workspace, "__tags": []} - tasks = [get_tags_for_workspace(workspace) for workspace in workspaces] - enriched_workspaces = [await task for task in asyncio.as_completed(tasks)] - + enriched_workspaces = await process_in_queue( + workspaces, + get_tags_for_workspace, + concurrency=MAX_CONCURRENCY + ) + return enriched_workspaces @@ -120,31 +129,33 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - terraform_client = init_terraform_client() - - async def fetch_runs_for_workspace( - workspace: dict[str, Any] - ) -> List[List[Dict[str, Any]]]: - return [ - run - async for run in terraform_client.get_paginated_runs_for_workspace( - workspace["id"] - ) - ] + BATCH_SIZE = 10 - async def fetch_runs_for_all_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: + terraform_client = init_terraform_client() + + async def fetch_runs_for_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]: + all_runs = [] + async for runs in terraform_client.get_paginated_runs_for_workspace(workspace["id"]): + all_runs.extend(runs) + return all_runs + + async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: async for workspaces in terraform_client.get_paginated_workspaces(): logger.info( - f"Received {len(workspaces)} batch workspaces... fetching its associated {kind}" + f"Processing batch of {len(workspaces)} workspaces for {kind}" ) + + runs_by_workspace = await process_in_queue( + workspaces, + fetch_runs_for_workspace, + concurrency=BATCH_SIZE + ) + + for workspace_runs in runs_by_workspace: + if workspace_runs: + yield workspace_runs - tasks = [fetch_runs_for_workspace(workspace) for workspace in workspaces] - for completed_task in asyncio.as_completed(tasks): - workspace_runs = await completed_task - for runs in workspace_runs: - yield runs - - async for run_batch in fetch_runs_for_all_workspaces(): + async for run_batch in process_workspaces(): yield run_batch From 104caa64ac6e35ffb56f46f56c45ae77b75c0a60 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 30 Oct 2024 23:39:29 +0000 Subject: [PATCH 04/28] Update main.py --- integrations/terraform-cloud/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index cba4363521..0f4ff3aafd 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -142,7 +142,7 @@ async def fetch_runs_for_workspace(workspace: dict[str, Any]) -> List[dict[str, async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: async for workspaces in terraform_client.get_paginated_workspaces(): logger.info( - f"Processing batch of {len(workspaces)} workspaces for {kind}" + f"Getting {kind}s for {len(workspaces)} workspaces" ) runs_by_workspace = await process_in_queue( From 75412c4dd94dbc6fb8945c6d10540d74ff4a64e9 Mon Sep 17 00:00:00 2001 From: hpal Date: Thu, 31 Oct 2024 01:10:54 +0000 Subject: [PATCH 05/28] add token bucket; add comments why --- integrations/terraform-cloud/client.py | 74 ++++++++++++++----- integrations/terraform-cloud/main.py | 44 +++++------ integrations/terraform-cloud/rate_limiting.py | 32 ++++++++ 3 files changed, 107 insertions(+), 43 deletions(-) create mode 100644 integrations/terraform-cloud/rate_limiting.py diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index fbfdce43be..3c7eba0a9b 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -1,14 +1,41 @@ -from typing import Any, AsyncGenerator, Optional -from port_ocean.utils import http_async_client +""" +Terraform Cloud API Client with dual rate limiting strategy: + +1. Token Bucket (self.burst_limiter): + - Handles burst capacity for concurrent operations + - Allows short bursts above base rate limit + - Essential for concurrent workspace operations (tags, runs, state versions) + - First line of defense against rate limits + +2. AsyncLimiter (self.rate_limiter): + - Time-based rate limiting + - Ensures steady-state request rate + - Maintains long-term compliance with API limits + - Second layer of protection + +This dual approach is necessary because: +- Workspace operations can trigger multiple concurrent requests +- Some operations (like fetching runs) require pagination +- Need to handle both burst capacity and steady-state rate +- Terraform's API has both rate (30 req/sec) and concurrency constraints +# https://developer.hashicorp.com/terraform/enterprise/application-administration/general#api-rate-limiting +""" + +import asyncio +import time import httpx -from loguru import logger + from enum import StrEnum +from typing import Any, AsyncGenerator, Optional from aiolimiter import AsyncLimiter -import time -import asyncio +from loguru import logger from port_ocean.context.event import event +from port_ocean.utils import http_async_client + +from rate_limiting import TokenBucket +# Constants TERRAFORM_WEBHOOK_EVENTS = [ "run:applying", "run:completed", @@ -24,10 +51,7 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 - -# Terraform's rate limit is 30 requests per second -# We're using a buffer of 5 requests to account for any variability in request times -TERRAFORM_RATE_LIMIT = 25 +TERRAFORM_RATE_LIMIT = 25 # Buffer below 30/sec limit class TerraformClient: @@ -40,10 +64,14 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.api_url = f"{self.terraform_base_url}/api/v2" self.client = http_async_client self.client.headers.update(self.base_headers) - + self.rate_limiter = AsyncLimiter(TERRAFORM_RATE_LIMIT, 1) + self.burst_limiter = TokenBucket.create( + rate=25, # Refill rate (tokens/sec) slightly lower than terraform's 30 reqs/sec limit + capacity=50, # Extra burst capacity for workspace enrichment operations + ) self._remaining = TERRAFORM_RATE_LIMIT - self._reset_time = None + self._reset_time = time.time() async def send_api_request( self, @@ -52,6 +80,10 @@ async def send_api_request( query_params: Optional[dict[str, Any]] = None, json_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: + # First handle burst capacity + await self.burst_limiter.acquire() + + # Then handle overall rate limiting async with self.rate_limiter: try: url = f"{self.api_url}/{endpoint}" @@ -62,10 +94,14 @@ async def send_api_request( json=json_data, ) - self._remaining = int(response.headers.get('x-ratelimit-remaining', TERRAFORM_RATE_LIMIT)) - reset_in = float(response.headers.get('x-ratelimit-reset', 1)) # Default to 1 second + self._remaining = int( + response.headers.get("x-ratelimit-remaining", TERRAFORM_RATE_LIMIT) + ) + reset_in = float( + response.headers.get("x-ratelimit-reset", 1) + ) # Default to 1 second self._reset_time = time.time() + reset_in - + logger.info( f"Rate limit info - " f"Limit: {response.headers.get('x-ratelimit-limit', TERRAFORM_RATE_LIMIT)}, " @@ -80,21 +116,25 @@ async def send_api_request( f"Waiting {reset_in} seconds" ) await asyncio.sleep(reset_in) - return await self.send_api_request(endpoint, method, query_params, json_data) + return await self.send_api_request( + endpoint, method, query_params, json_data + ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 429: - reset_in = float(e.response.headers.get('x-ratelimit-reset', 1)) + reset_in = float(e.response.headers.get("x-ratelimit-reset", 1)) logger.warning( f"Rate limited on {endpoint}. " f"Headers: {dict(e.response.headers)}. " f"Waiting {reset_in} seconds" ) await asyncio.sleep(reset_in) - return await self.send_api_request(endpoint, method, query_params, json_data) + return await self.send_api_request( + endpoint, method, query_params, json_data + ) logger.error( f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}" ) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 0f4ff3aafd..481134222c 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -1,7 +1,6 @@ -import asyncio from asyncio import gather from enum import StrEnum -from typing import Any, List, Dict +from typing import Any, List from loguru import logger @@ -53,13 +52,10 @@ async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]: ) return {**state_version, "__output": {}} - # We can process many more items concurrently with 30 req/sec limit enriched_versions = await process_in_queue( - state_versions, - fetch_output, - concurrency=MAX_CONCURRENCY + state_versions, fetch_output, concurrency=MAX_CONCURRENCY ) - + return enriched_versions @@ -73,17 +69,13 @@ async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]: tags.extend(tag_batch) return {**workspace, "__tags": tags} except Exception as e: - logger.warning( - f"Failed to fetch tags for workspace {workspace['id']}: {e}" - ) + logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}") return {**workspace, "__tags": []} enriched_workspaces = await process_in_queue( - workspaces, - get_tags_for_workspace, - concurrency=MAX_CONCURRENCY + workspaces, get_tags_for_workspace, concurrency=MAX_CONCURRENCY ) - + return enriched_workspaces @@ -129,28 +121,28 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - BATCH_SIZE = 10 + BATCH_SIZE = 25 terraform_client = init_terraform_client() - - async def fetch_runs_for_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]: + + async def fetch_runs_for_workspace( + workspace: dict[str, Any] + ) -> List[dict[str, Any]]: all_runs = [] - async for runs in terraform_client.get_paginated_runs_for_workspace(workspace["id"]): + async for runs in terraform_client.get_paginated_runs_for_workspace( + workspace["id"] + ): all_runs.extend(runs) return all_runs async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: async for workspaces in terraform_client.get_paginated_workspaces(): - logger.info( - f"Getting {kind}s for {len(workspaces)} workspaces" - ) - + logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") + runs_by_workspace = await process_in_queue( - workspaces, - fetch_runs_for_workspace, - concurrency=BATCH_SIZE + workspaces, fetch_runs_for_workspace, concurrency=BATCH_SIZE ) - + for workspace_runs in runs_by_workspace: if workspace_runs: yield workspace_runs diff --git a/integrations/terraform-cloud/rate_limiting.py b/integrations/terraform-cloud/rate_limiting.py new file mode 100644 index 0000000000..88cbefdb48 --- /dev/null +++ b/integrations/terraform-cloud/rate_limiting.py @@ -0,0 +1,32 @@ +from dataclasses import dataclass +import time +import asyncio + + +@dataclass +class TokenBucket: + capacity: int # Maximum tokens + rate: float # Tokens per second + tokens: float # Current tokens + last_update: float # Last token update timestamp + + @classmethod + def create(cls, rate: int, capacity: int) -> "TokenBucket": + return cls( + capacity=capacity, rate=rate, tokens=capacity, last_update=time.time() + ) + + def _refill(self) -> None: + now = time.time() + delta = now - self.last_update + self.tokens = min(self.capacity, self.tokens + delta * self.rate) + self.last_update = now + + async def acquire(self) -> None: + while True: + self._refill() + if self.tokens >= 1: + self.tokens -= 1 + return + # Wait for token refill + await asyncio.sleep(1 / self.rate) From 28f50d3ab6b29b16f915ee6a633f91a889104aa1 Mon Sep 17 00:00:00 2001 From: hpal Date: Thu, 31 Oct 2024 16:06:33 +0000 Subject: [PATCH 06/28] simplify impl --- integrations/terraform-cloud/client.py | 33 ------------- integrations/terraform-cloud/main.py | 49 ++++++++++--------- integrations/terraform-cloud/rate_limiting.py | 32 ------------ 3 files changed, 25 insertions(+), 89 deletions(-) delete mode 100644 integrations/terraform-cloud/rate_limiting.py diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 3c7eba0a9b..977882dfd8 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -1,26 +1,3 @@ -""" -Terraform Cloud API Client with dual rate limiting strategy: - -1. Token Bucket (self.burst_limiter): - - Handles burst capacity for concurrent operations - - Allows short bursts above base rate limit - - Essential for concurrent workspace operations (tags, runs, state versions) - - First line of defense against rate limits - -2. AsyncLimiter (self.rate_limiter): - - Time-based rate limiting - - Ensures steady-state request rate - - Maintains long-term compliance with API limits - - Second layer of protection - -This dual approach is necessary because: -- Workspace operations can trigger multiple concurrent requests -- Some operations (like fetching runs) require pagination -- Need to handle both burst capacity and steady-state rate -- Terraform's API has both rate (30 req/sec) and concurrency constraints -# https://developer.hashicorp.com/terraform/enterprise/application-administration/general#api-rate-limiting -""" - import asyncio import time import httpx @@ -33,8 +10,6 @@ from port_ocean.context.event import event from port_ocean.utils import http_async_client -from rate_limiting import TokenBucket - # Constants TERRAFORM_WEBHOOK_EVENTS = [ "run:applying", @@ -66,10 +41,6 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.client.headers.update(self.base_headers) self.rate_limiter = AsyncLimiter(TERRAFORM_RATE_LIMIT, 1) - self.burst_limiter = TokenBucket.create( - rate=25, # Refill rate (tokens/sec) slightly lower than terraform's 30 reqs/sec limit - capacity=50, # Extra burst capacity for workspace enrichment operations - ) self._remaining = TERRAFORM_RATE_LIMIT self._reset_time = time.time() @@ -80,10 +51,6 @@ async def send_api_request( query_params: Optional[dict[str, Any]] = None, json_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: - # First handle burst capacity - await self.burst_limiter.acquire() - - # Then handle overall rate limiting async with self.rate_limiter: try: url = f"{self.api_url}/{endpoint}" diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 481134222c..4d7c10c41c 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -1,14 +1,18 @@ +import asyncio +import functools from asyncio import gather from enum import StrEnum from typing import Any, List - from loguru import logger from client import TerraformClient from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_RESULT from port_ocean.utils.queue_utils import process_in_queue - +from port_ocean.utils.async_iterators import ( + stream_async_iterators_tasks, + semaphore_async_iterator, +) class ObjectKind(StrEnum): WORKSPACE = "workspace" @@ -22,6 +26,7 @@ class ObjectKind(StrEnum): # Constants for rate limiting MAX_CONCURRENCY = 25 # Keep slightly under the 30/sec limit to allow for overhead +semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENCY) def init_terraform_client() -> TerraformClient: @@ -121,34 +126,30 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - BATCH_SIZE = 25 - terraform_client = init_terraform_client() - async def fetch_runs_for_workspace( - workspace: dict[str, Any] - ) -> List[dict[str, Any]]: - all_runs = [] - async for runs in terraform_client.get_paginated_runs_for_workspace( - workspace["id"] - ): - all_runs.extend(runs) - return all_runs + async def fetch_runs_for_workspace(workspace_id: str) -> ASYNC_GENERATOR_RESYNC_TYPE: + async for runs in terraform_client.get_paginated_runs_for_workspace(workspace_id): + yield runs async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: async for workspaces in terraform_client.get_paginated_workspaces(): logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") - - runs_by_workspace = await process_in_queue( - workspaces, fetch_runs_for_workspace, concurrency=BATCH_SIZE - ) - - for workspace_runs in runs_by_workspace: - if workspace_runs: - yield workspace_runs - - async for run_batch in process_workspaces(): - yield run_batch + + tasks = [ + semaphore_async_iterator( + semaphore, + functools.partial(fetch_runs_for_workspace, workspace["id"]) + ) + for workspace in workspaces + ] + + if tasks: + async for run_batch in stream_async_iterators_tasks(*tasks): + yield run_batch + + async for runs in process_workspaces(): + yield runs @ocean.on_resync(ObjectKind.STATE_VERSION) diff --git a/integrations/terraform-cloud/rate_limiting.py b/integrations/terraform-cloud/rate_limiting.py deleted file mode 100644 index 88cbefdb48..0000000000 --- a/integrations/terraform-cloud/rate_limiting.py +++ /dev/null @@ -1,32 +0,0 @@ -from dataclasses import dataclass -import time -import asyncio - - -@dataclass -class TokenBucket: - capacity: int # Maximum tokens - rate: float # Tokens per second - tokens: float # Current tokens - last_update: float # Last token update timestamp - - @classmethod - def create(cls, rate: int, capacity: int) -> "TokenBucket": - return cls( - capacity=capacity, rate=rate, tokens=capacity, last_update=time.time() - ) - - def _refill(self) -> None: - now = time.time() - delta = now - self.last_update - self.tokens = min(self.capacity, self.tokens + delta * self.rate) - self.last_update = now - - async def acquire(self) -> None: - while True: - self._refill() - if self.tokens >= 1: - self.tokens -= 1 - return - # Wait for token refill - await asyncio.sleep(1 / self.rate) From 34849d1593d2d056c6eb243665727a7541a8282c Mon Sep 17 00:00:00 2001 From: hpal Date: Thu, 31 Oct 2024 16:07:14 +0000 Subject: [PATCH 07/28] Update main.py --- integrations/terraform-cloud/main.py | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 4d7c10c41c..b2347dbbfb 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -14,6 +14,7 @@ semaphore_async_iterator, ) + class ObjectKind(StrEnum): WORKSPACE = "workspace" RUN = "run" @@ -128,22 +129,26 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() - async def fetch_runs_for_workspace(workspace_id: str) -> ASYNC_GENERATOR_RESYNC_TYPE: - async for runs in terraform_client.get_paginated_runs_for_workspace(workspace_id): + async def fetch_runs_for_workspace( + workspace_id: str, + ) -> ASYNC_GENERATOR_RESYNC_TYPE: + async for runs in terraform_client.get_paginated_runs_for_workspace( + workspace_id + ): yield runs async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: async for workspaces in terraform_client.get_paginated_workspaces(): logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") - + tasks = [ semaphore_async_iterator( semaphore, - functools.partial(fetch_runs_for_workspace, workspace["id"]) + functools.partial(fetch_runs_for_workspace, workspace["id"]), ) for workspace in workspaces ] - + if tasks: async for run_batch in stream_async_iterators_tasks(*tasks): yield run_batch From e32a608798a2f6245606fcaf1dd09fb3359a4e0b Mon Sep 17 00:00:00 2001 From: Tom Tankilevitch <59158507+Tankilevitch@users.noreply.github.com> Date: Thu, 31 Oct 2024 21:36:16 +0200 Subject: [PATCH 08/28] Update integrations/terraform-cloud/client.py --- integrations/terraform-cloud/client.py | 1 - 1 file changed, 1 deletion(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 977882dfd8..a8e90a2030 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -10,7 +10,6 @@ from port_ocean.context.event import event from port_ocean.utils import http_async_client -# Constants TERRAFORM_WEBHOOK_EVENTS = [ "run:applying", "run:completed", From fd6e74c13f9e6e4bf991dfedb86f40fc9f490067 Mon Sep 17 00:00:00 2001 From: hpal Date: Fri, 1 Nov 2024 19:16:03 +0000 Subject: [PATCH 09/28] Remove unncessary code --- integrations/terraform-cloud/client.py | 28 +++++++++----------------- 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 977882dfd8..711ce61fde 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -1,5 +1,4 @@ import asyncio -import time import httpx from enum import StrEnum @@ -26,7 +25,8 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 -TERRAFORM_RATE_LIMIT = 25 # Buffer below 30/sec limit +NO_OF_REQUESTS = 25 +NO_OF_SECONDS = 1 class TerraformClient: @@ -40,9 +40,7 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.client = http_async_client self.client.headers.update(self.base_headers) - self.rate_limiter = AsyncLimiter(TERRAFORM_RATE_LIMIT, 1) - self._remaining = TERRAFORM_RATE_LIMIT - self._reset_time = time.time() + self.rate_limiter = AsyncLimiter(NO_OF_REQUESTS, NO_OF_SECONDS) async def send_api_request( self, @@ -61,28 +59,22 @@ async def send_api_request( json=json_data, ) - self._remaining = int( - response.headers.get("x-ratelimit-remaining", TERRAFORM_RATE_LIMIT) - ) - reset_in = float( - response.headers.get("x-ratelimit-reset", 1) - ) # Default to 1 second - self._reset_time = time.time() + reset_in - logger.info( f"Rate limit info - " - f"Limit: {response.headers.get('x-ratelimit-limit', TERRAFORM_RATE_LIMIT)}, " - f"Remaining: {self._remaining}, " - f"Reset: {reset_in}" + f"Limit: {response.headers.get('x-ratelimit-limit', NO_OF_REQUESTS)}, " + f"Remaining: {response.headers.get('x-ratelimit-remaining', NO_OF_REQUESTS)}, " + f"Reset: {response.headers.get('x-ratelimit-reset', 1)}" ) if response.status_code == 429: logger.warning( f"Rate limited on {endpoint}. " f"Headers: {dict(response.headers)}. " - f"Waiting {reset_in} seconds" + f"Waiting {response.headers.get('x-ratelimit-reset', 1)} seconds" + ) + await asyncio.sleep( + float(response.headers.get("x-ratelimit-reset", 1)) ) - await asyncio.sleep(reset_in) return await self.send_api_request( endpoint, method, query_params, json_data ) From a0912e78eac213375b56ac39515c2273af8b385b Mon Sep 17 00:00:00 2001 From: hpal Date: Fri, 1 Nov 2024 19:16:55 +0000 Subject: [PATCH 10/28] Add concurrent limiti to runs due to timeouts --- integrations/terraform-cloud/main.py | 77 ++++++++++++---------------- 1 file changed, 34 insertions(+), 43 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index b2347dbbfb..254b4c8185 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -1,5 +1,3 @@ -import asyncio -import functools from asyncio import gather from enum import StrEnum from typing import Any, List @@ -9,10 +7,6 @@ from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_RESULT from port_ocean.utils.queue_utils import process_in_queue -from port_ocean.utils.async_iterators import ( - stream_async_iterators_tasks, - semaphore_async_iterator, -) class ObjectKind(StrEnum): @@ -25,10 +19,6 @@ class ObjectKind(StrEnum): SKIP_WEBHOOK_CREATION = False -# Constants for rate limiting -MAX_CONCURRENCY = 25 # Keep slightly under the 30/sec limit to allow for overhead -semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENCY) - def init_terraform_client() -> TerraformClient: """ @@ -58,11 +48,10 @@ async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]: ) return {**state_version, "__output": {}} - enriched_versions = await process_in_queue( - state_versions, fetch_output, concurrency=MAX_CONCURRENCY + enriched_versions = await gather( + *[fetch_output(state_version) for state_version in state_versions] ) - - return enriched_versions + return list(enriched_versions) async def enrich_workspaces_with_tags( @@ -78,11 +67,10 @@ async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]: logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}") return {**workspace, "__tags": []} - enriched_workspaces = await process_in_queue( - workspaces, get_tags_for_workspace, concurrency=MAX_CONCURRENCY + enriched_workspaces = await gather( + *[get_tags_for_workspace(workspace) for workspace in workspaces] ) - - return enriched_workspaces + return list(enriched_workspaces) async def enrich_workspace_with_tags( @@ -128,33 +116,36 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() + CONCURRENCY = 15 # Process 5 workspaces concurrently async def fetch_runs_for_workspace( - workspace_id: str, - ) -> ASYNC_GENERATOR_RESYNC_TYPE: - async for runs in terraform_client.get_paginated_runs_for_workspace( - workspace_id - ): - yield runs - - async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: - async for workspaces in terraform_client.get_paginated_workspaces(): - logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") - - tasks = [ - semaphore_async_iterator( - semaphore, - functools.partial(fetch_runs_for_workspace, workspace["id"]), - ) - for workspace in workspaces - ] - - if tasks: - async for run_batch in stream_async_iterators_tasks(*tasks): - yield run_batch - - async for runs in process_workspaces(): - yield runs + workspace: dict[str, Any] + ) -> list[dict[str, Any]]: + try: + all_runs = [] + async for runs in terraform_client.get_paginated_runs_for_workspace( + workspace["id"] + ): + all_runs.extend(runs) + return all_runs + except Exception as e: + logger.error( + f"Error fetching runs for workspace {workspace['id']}: {str(e)}" + ) + return [] + + async for workspaces in terraform_client.get_paginated_workspaces(): + logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") + + # Use process_in_queue to control concurrency + runs_per_workspace = await process_in_queue( + workspaces, fetch_runs_for_workspace, concurrency=CONCURRENCY + ) + + # Yield non-empty results + for runs in runs_per_workspace: + if runs: # Only yield if runs were found + yield runs @ocean.on_resync(ObjectKind.STATE_VERSION) From e26fbad50ad9dbea165a60cc58700fe83055757b Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 4 Nov 2024 18:15:28 +0000 Subject: [PATCH 11/28] Update main.py --- integrations/terraform-cloud/main.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 254b4c8185..ee86a8bc39 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -116,7 +116,7 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() - CONCURRENCY = 15 # Process 5 workspaces concurrently + WORKSPACE_CONCURRENCY = 15 async def fetch_runs_for_workspace( workspace: dict[str, Any] @@ -137,15 +137,13 @@ async def fetch_runs_for_workspace( async for workspaces in terraform_client.get_paginated_workspaces(): logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") - # Use process_in_queue to control concurrency runs_per_workspace = await process_in_queue( - workspaces, fetch_runs_for_workspace, concurrency=CONCURRENCY + workspaces, fetch_runs_for_workspace, + concurrency=WORKSPACE_CONCURRENCY ) - # Yield non-empty results for runs in runs_per_workspace: - if runs: # Only yield if runs were found - yield runs + yield runs @ocean.on_resync(ObjectKind.STATE_VERSION) From f232a7eadf80356776bcc4f1956ed4e4ebe48fbc Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 4 Nov 2024 18:18:41 +0000 Subject: [PATCH 12/28] fix liniting --- integrations/terraform-cloud/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index ee86a8bc39..8767e9c27a 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -138,8 +138,7 @@ async def fetch_runs_for_workspace( logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") runs_per_workspace = await process_in_queue( - workspaces, fetch_runs_for_workspace, - concurrency=WORKSPACE_CONCURRENCY + workspaces, fetch_runs_for_workspace, concurrency=WORKSPACE_CONCURRENCY ) for runs in runs_per_workspace: From 0c9d209f75dc9235c421e3364a721da1af3df15c Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 4 Nov 2024 18:27:17 +0000 Subject: [PATCH 13/28] Add changelog --- integrations/terraform-cloud/CHANGELOG.md | 5 +++++ integrations/terraform-cloud/pyproject.toml | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index bfd41cf7c3..1897f295f2 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.1.75 (2024-11-05) + +### Improvements + +- Implemented Terraform API rate limiting to prevent resync failures that caused stale data in Port. Previously, exceeding the API's request limit interrupted resyncs, preventing the deletion of outdated entities. ## 0.1.74 (2024-10-23) diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index e9e232184e..fbcc30a95b 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.74" +version = "0.1.75" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "] From 349424f267cb8abe4b0e4261a8238db90df0129a Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 6 Nov 2024 16:33:33 +0000 Subject: [PATCH 14/28] Update poetry.lock --- integrations/terraform-cloud/poetry.lock | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/terraform-cloud/poetry.lock b/integrations/terraform-cloud/poetry.lock index cbf918bf5f..a94b1b3048 100644 --- a/integrations/terraform-cloud/poetry.lock +++ b/integrations/terraform-cloud/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. [[package]] name = "aiolimiter" @@ -1644,4 +1644,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "68de7278e7deb5874d80635d160841ae51a686261ed101e1070175f271ef1f33" +content-hash = "9f197a11b6f67d9a3501cdf902f3aec1b766e9c252258bc5c486175d627f5d21" From 0baa218f099d4ae87e559856b658d1a5031f6444 Mon Sep 17 00:00:00 2001 From: hpal Date: Wed, 6 Nov 2024 16:50:13 +0000 Subject: [PATCH 15/28] Remove extra retry logic The base http client already contains retry logic for the 429 errors --- integrations/terraform-cloud/client.py | 64 ++++++-------------------- 1 file changed, 15 insertions(+), 49 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index d56bc03e41..659575539b 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -1,6 +1,3 @@ -import asyncio -import httpx - from enum import StrEnum from typing import Any, AsyncGenerator, Optional from aiolimiter import AsyncLimiter @@ -49,54 +46,23 @@ async def send_api_request( json_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: async with self.rate_limiter: - try: - url = f"{self.api_url}/{endpoint}" - response = await self.client.request( - method=method, - url=url, - params=query_params, - json=json_data, - ) - - logger.info( - f"Rate limit info - " - f"Limit: {response.headers.get('x-ratelimit-limit', NO_OF_REQUESTS)}, " - f"Remaining: {response.headers.get('x-ratelimit-remaining', NO_OF_REQUESTS)}, " - f"Reset: {response.headers.get('x-ratelimit-reset', 1)}" - ) - - if response.status_code == 429: - logger.warning( - f"Rate limited on {endpoint}. " - f"Headers: {dict(response.headers)}. " - f"Waiting {response.headers.get('x-ratelimit-reset', 1)} seconds" - ) - await asyncio.sleep( - float(response.headers.get("x-ratelimit-reset", 1)) - ) - return await self.send_api_request( - endpoint, method, query_params, json_data - ) + url = f"{self.api_url}/{endpoint}" + response = await self.client.request( + method=method, + url=url, + params=query_params, + json=json_data, + ) - response.raise_for_status() - return response.json() + logger.info( + f"Rate limit info - " + f"Limit: {response.headers.get('x-ratelimit-limit', NO_OF_REQUESTS)}, " + f"Remaining: {response.headers.get('x-ratelimit-remaining', NO_OF_REQUESTS)}, " + f"Reset: {response.headers.get('x-ratelimit-reset', 1)}" + ) - except httpx.HTTPStatusError as e: - if e.response.status_code == 429: - reset_in = float(e.response.headers.get("x-ratelimit-reset", 1)) - logger.warning( - f"Rate limited on {endpoint}. " - f"Headers: {dict(e.response.headers)}. " - f"Waiting {reset_in} seconds" - ) - await asyncio.sleep(reset_in) - return await self.send_api_request( - endpoint, method, query_params, json_data - ) - logger.error( - f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}" - ) - raise + response.raise_for_status() + return response.json() async def get_paginated_resources( self, endpoint: str, params: Optional[dict[str, Any]] = None From 7ed7220e88c114f179203b15482e8e46bf7c811b Mon Sep 17 00:00:00 2001 From: hpal Date: Thu, 7 Nov 2024 11:20:42 +0000 Subject: [PATCH 16/28] Bump version --- integrations/terraform-cloud/CHANGELOG.md | 2 +- integrations/terraform-cloud/client.py | 14 +++----------- integrations/terraform-cloud/pyproject.toml | 2 +- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index 703cf9fb0c..154f7dbd15 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## 0.1.75 (2024-11-05) +## 0.1.76 (2024-11-07) ### Improvements diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 659575539b..c962654c58 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -21,8 +21,8 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 -NO_OF_REQUESTS = 25 -NO_OF_SECONDS = 1 +NUMBER_OF_REQUESTS = 25 +NUMBER_OF_SECONDS = 1 class TerraformClient: @@ -36,7 +36,7 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.client = http_async_client self.client.headers.update(self.base_headers) - self.rate_limiter = AsyncLimiter(NO_OF_REQUESTS, NO_OF_SECONDS) + self.rate_limiter = AsyncLimiter(NUMBER_OF_REQUESTS, NUMBER_OF_SECONDS) async def send_api_request( self, @@ -53,14 +53,6 @@ async def send_api_request( params=query_params, json=json_data, ) - - logger.info( - f"Rate limit info - " - f"Limit: {response.headers.get('x-ratelimit-limit', NO_OF_REQUESTS)}, " - f"Remaining: {response.headers.get('x-ratelimit-remaining', NO_OF_REQUESTS)}, " - f"Reset: {response.headers.get('x-ratelimit-reset', 1)}" - ) - response.raise_for_status() return response.json() diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index ecb4c8c0d4..69fc39b524 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.75" +version = "0.1.76" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "] From 6a37db30d0a026578db3ecae589ec43a00a4e1fe Mon Sep 17 00:00:00 2001 From: hpal Date: Thu, 21 Nov 2024 12:28:23 +0000 Subject: [PATCH 17/28] refactor(terraform): optimize run sync with controlled concurrency Terraform Cloud API allows 30 requests/second per user. While the current code already uses as_completed() for concurrent processing, it lacks batch size control. This change adds request limiting while maintaining the efficient async processing: Before: - Creates concurrent tasks for ALL workspaces in a batch at once - Could potentially create hundreds of simultaneous requests After: - Processes workspaces in controlled batches of 25 - Maintains efficient as_completed() processing within batches - Flattened, simpler generator structure - Streams runs immediately when available Key improvements: - Predictable API usage with batch size control - Same responsive processing within controlled batches This retains the efficient concurrent processing while adding safety limits for API usage. --- integrations/terraform-cloud/main.py | 43 +++++++++++++--------------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 8767e9c27a..426323410d 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -1,4 +1,5 @@ from asyncio import gather +import asyncio from enum import StrEnum from typing import Any, List from loguru import logger @@ -6,7 +7,6 @@ from client import TerraformClient from port_ocean.context.ocean import ocean from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_RESULT -from port_ocean.utils.queue_utils import process_in_queue class ObjectKind(StrEnum): @@ -107,6 +107,7 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() async for workspaces in terraform_client.get_paginated_workspaces(): logger.info(f"Received {len(workspaces)} batch {kind}s") + yield workspaces enriched_workspace_batch = await enrich_workspaces_with_tags( terraform_client, workspaces ) @@ -116,33 +117,29 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() - WORKSPACE_CONCURRENCY = 15 + BATCH_SIZE = 25 # Stay safely under 30 req/sec limit - async def fetch_runs_for_workspace( - workspace: dict[str, Any] - ) -> list[dict[str, Any]]: - try: - all_runs = [] - async for runs in terraform_client.get_paginated_runs_for_workspace( - workspace["id"] - ): - all_runs.extend(runs) - return all_runs - except Exception as e: - logger.error( - f"Error fetching runs for workspace {workspace['id']}: {str(e)}" - ) - return [] + async def process_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]: + runs = [] + async for run_batch in terraform_client.get_paginated_runs_for_workspace( + workspace["id"] + ): + if run_batch: + runs.extend(run_batch) + return runs async for workspaces in terraform_client.get_paginated_workspaces(): - logger.info(f"Getting {kind}s for {len(workspaces)} workspaces") + logger.info(f"Processing batch of {len(workspaces)} workspaces") - runs_per_workspace = await process_in_queue( - workspaces, fetch_runs_for_workspace, concurrency=WORKSPACE_CONCURRENCY - ) + # Process in batches to stay under rate limit + for i in range(0, len(workspaces), BATCH_SIZE): + batch = workspaces[i : i + BATCH_SIZE] + tasks = [process_workspace(w) for w in batch] - for runs in runs_per_workspace: - yield runs + for completed_task in asyncio.as_completed(tasks): + runs = await completed_task + if runs: + yield runs @ocean.on_resync(ObjectKind.STATE_VERSION) From d38581e17ac894f10ae8f791317bd589c0d8580f Mon Sep 17 00:00:00 2001 From: hpal Date: Thu, 21 Nov 2024 18:42:50 +0000 Subject: [PATCH 18/28] Update main.py --- integrations/terraform-cloud/main.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 426323410d..f59a76ea5b 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -107,7 +107,6 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() async for workspaces in terraform_client.get_paginated_workspaces(): logger.info(f"Received {len(workspaces)} batch {kind}s") - yield workspaces enriched_workspace_batch = await enrich_workspaces_with_tags( terraform_client, workspaces ) @@ -134,7 +133,7 @@ async def process_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]: # Process in batches to stay under rate limit for i in range(0, len(workspaces), BATCH_SIZE): batch = workspaces[i : i + BATCH_SIZE] - tasks = [process_workspace(w) for w in batch] + tasks = [process_workspace(workspace) for workspace in batch] for completed_task in asyncio.as_completed(tasks): runs = await completed_task From d643e8a8c6e66f6b636ca977f81dbe807b98ceef Mon Sep 17 00:00:00 2001 From: hpal Date: Fri, 22 Nov 2024 17:37:39 +0000 Subject: [PATCH 19/28] [Terraform] Reinstate exception handling in client --- integrations/terraform-cloud/client.py | 63 ++++++++++++++++++++---- integrations/terraform-cloud/poetry.lock | 2 +- 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index c962654c58..b1f17ea474 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -2,6 +2,9 @@ from typing import Any, AsyncGenerator, Optional from aiolimiter import AsyncLimiter from loguru import logger +import asyncio +import random +import httpx from port_ocean.context.event import event from port_ocean.utils import http_async_client @@ -45,16 +48,56 @@ async def send_api_request( query_params: Optional[dict[str, Any]] = None, json_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: - async with self.rate_limiter: - url = f"{self.api_url}/{endpoint}" - response = await self.client.request( - method=method, - url=url, - params=query_params, - json=json_data, - ) - response.raise_for_status() - return response.json() + url = f"{self.api_url}/{endpoint}" + + try: + async with self.rate_limiter: + logger.debug( + f"Requesting {method} {url} with params: {query_params} and body: {json_data}" + ) + + response = await self.client.request( + method=method, + url=url, + params=query_params, + json=json_data, + ) + + # Extract rate limit info from headers + rate_limit = response.headers.get("x-ratelimit-limit") + rate_limit_remaining = response.headers.get("x-ratelimit-remaining") + rate_limit_reset = response.headers.get("x-ratelimit-reset") + + logger.debug( + f"Rate Limit: {rate_limit}, " + f"Remaining: {rate_limit_remaining}, " + f"Reset in: {rate_limit_reset} seconds" + ) + + response.raise_for_status() + + pagination_meta = response.json().get("meta", {}).get("pagination", {}) + logger.debug( + f"Successfully fetched {endpoint} with pagination info: {pagination_meta}" + ) + + return response.json() + + except httpx.HTTPStatusError as e: + if e.response.status_code == 429: + rate_limit_reset = e.response.headers.get("x-ratelimit-reset") + reset_time = float(rate_limit_reset or 1.0) + wait_time = reset_time + random.uniform(0, 1) # Add jitter + logger.warning( + f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds" + ) + await asyncio.sleep(wait_time) + logger.error(f"HTTP error for {url}: {str(e)}") + raise + + except Exception as e: + logger.error(f"Request failed for {url}: {str(e)}") + raise async def get_paginated_resources( self, endpoint: str, params: Optional[dict[str, Any]] = None diff --git a/integrations/terraform-cloud/poetry.lock b/integrations/terraform-cloud/poetry.lock index 3d1958c242..4d7993ee32 100644 --- a/integrations/terraform-cloud/poetry.lock +++ b/integrations/terraform-cloud/poetry.lock @@ -1671,4 +1671,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "1ff8f5a65bbb7f6fe2d6433c6c00d7fb5c39b1e6d079ae8fd5b0370488bf3f49" +content-hash = "2824362c31f605cee00c7ba74ebca8bf72b8a41bb5c0a4a5dae99faf00bab43f" From 591ee15273c9d7b75723f161d89601a6213c416b Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 25 Nov 2024 13:27:19 +0000 Subject: [PATCH 20/28] Update the logging and add constants --- integrations/terraform-cloud/client.py | 39 ++++++++++++-------------- 1 file changed, 18 insertions(+), 21 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index b1f17ea474..10924c92ac 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -26,6 +26,9 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 NUMBER_OF_REQUESTS = 25 NUMBER_OF_SECONDS = 1 +DEFAULT_RATE_LIMIT_RESET = 1.0 +JITTER_MIN_SECONDS = 0 +JITTER_MAX_SECONDS = 1 class TerraformClient: @@ -63,33 +66,27 @@ async def send_api_request( json=json_data, ) - # Extract rate limit info from headers - rate_limit = response.headers.get("x-ratelimit-limit") - rate_limit_remaining = response.headers.get("x-ratelimit-remaining") - rate_limit_reset = response.headers.get("x-ratelimit-reset") - - logger.debug( - f"Rate Limit: {rate_limit}, " - f"Remaining: {rate_limit_remaining}, " - f"Reset in: {rate_limit_reset} seconds" - ) - response.raise_for_status() - - pagination_meta = response.json().get("meta", {}).get("pagination", {}) - logger.debug( - f"Successfully fetched {endpoint} with pagination info: {pagination_meta}" - ) - return response.json() except httpx.HTTPStatusError as e: if e.response.status_code == 429: + rate_limit = e.response.headers.get("x-ratelimit-limit") + rate_limit_remaining = e.response.headers.get("x-ratelimit-remaining") rate_limit_reset = e.response.headers.get("x-ratelimit-reset") - reset_time = float(rate_limit_reset or 1.0) - wait_time = reset_time + random.uniform(0, 1) # Add jitter - logger.warning( - f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds" + + reset_time = float(rate_limit_reset or DEFAULT_RATE_LIMIT_RESET) + wait_time = reset_time + random.uniform( + JITTER_MIN_SECONDS, JITTER_MAX_SECONDS + ) + + logger.info( + "Rate limit reached, waiting before retry", + wait_time=f"{wait_time:.2f}", + endpoint=endpoint, + rate_limit=rate_limit, + rate_limit_remaining=rate_limit_remaining, + rate_limit_reset=rate_limit_reset, ) await asyncio.sleep(wait_time) logger.error(f"HTTP error for {url}: {str(e)}") From 2f575018bfc72374b340e2462c8ac5d32dbf0f2e Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 25 Nov 2024 13:39:38 +0000 Subject: [PATCH 21/28] Bump version --- integrations/terraform-cloud/CHANGELOG.md | 3 ++- integrations/terraform-cloud/pyproject.toml | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index 61db2db6c6..53582817a3 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -6,7 +6,8 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## 0.1.76 (2024-11-07) + +## 0.1.81 (2024-11-25) ### Improvements diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index 45755c102d..6360ea5e10 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.80" +version = "0.1.81" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "] From 0fc7ffe80e1c686693fd0808574bce52e0c37dda Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 25 Nov 2024 15:06:38 +0000 Subject: [PATCH 22/28] Update client.py --- integrations/terraform-cloud/client.py | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 10924c92ac..a6fcb699f7 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -2,8 +2,6 @@ from typing import Any, AsyncGenerator, Optional from aiolimiter import AsyncLimiter from loguru import logger -import asyncio -import random import httpx from port_ocean.context.event import event @@ -26,9 +24,6 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 NUMBER_OF_REQUESTS = 25 NUMBER_OF_SECONDS = 1 -DEFAULT_RATE_LIMIT_RESET = 1.0 -JITTER_MIN_SECONDS = 0 -JITTER_MAX_SECONDS = 1 class TerraformClient: @@ -75,21 +70,13 @@ async def send_api_request( rate_limit_remaining = e.response.headers.get("x-ratelimit-remaining") rate_limit_reset = e.response.headers.get("x-ratelimit-reset") - reset_time = float(rate_limit_reset or DEFAULT_RATE_LIMIT_RESET) - wait_time = reset_time + random.uniform( - JITTER_MIN_SECONDS, JITTER_MAX_SECONDS - ) - logger.info( - "Rate limit reached, waiting before retry", - wait_time=f"{wait_time:.2f}", + "Rate limit reached", endpoint=endpoint, rate_limit=rate_limit, rate_limit_remaining=rate_limit_remaining, rate_limit_reset=rate_limit_reset, ) - await asyncio.sleep(wait_time) - logger.error(f"HTTP error for {url}: {str(e)}") raise except Exception as e: From 61b7f8bbe8e621c39a4fb7e805a4bc52b9d7dc48 Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 25 Nov 2024 15:58:19 +0000 Subject: [PATCH 23/28] Update poetry.lock --- integrations/terraform-cloud/poetry.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/terraform-cloud/poetry.lock b/integrations/terraform-cloud/poetry.lock index f86a2ea0e0..e087faa12f 100644 --- a/integrations/terraform-cloud/poetry.lock +++ b/integrations/terraform-cloud/poetry.lock @@ -1671,4 +1671,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "2cb72bcdbd016cc3a80717d66a23b2ed853a5bfa87268e17f6d32a8ee1b187b4" +content-hash = "9e18a6343103e1c0c6f3b38965d09ddc4ebfee6d3741f355ec1de7a5fe4810f0" From e0814381157f901f438b70dc34e5ff93375aa875 Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 25 Nov 2024 17:45:28 +0000 Subject: [PATCH 24/28] fix: remove redundant rate limit handling --- integrations/terraform-cloud/client.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index a6fcb699f7..2a632213cc 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -2,7 +2,6 @@ from typing import Any, AsyncGenerator, Optional from aiolimiter import AsyncLimiter from loguru import logger -import httpx from port_ocean.context.event import event from port_ocean.utils import http_async_client @@ -64,21 +63,6 @@ async def send_api_request( response.raise_for_status() return response.json() - except httpx.HTTPStatusError as e: - if e.response.status_code == 429: - rate_limit = e.response.headers.get("x-ratelimit-limit") - rate_limit_remaining = e.response.headers.get("x-ratelimit-remaining") - rate_limit_reset = e.response.headers.get("x-ratelimit-reset") - - logger.info( - "Rate limit reached", - endpoint=endpoint, - rate_limit=rate_limit, - rate_limit_remaining=rate_limit_remaining, - rate_limit_reset=rate_limit_reset, - ) - raise - except Exception as e: logger.error(f"Request failed for {url}: {str(e)}") raise From 17233608a09bece5ec4256e58c49ad7eb2006379 Mon Sep 17 00:00:00 2001 From: hpal Date: Mon, 25 Nov 2024 18:49:16 +0000 Subject: [PATCH 25/28] Update changelog --- integrations/terraform-cloud/CHANGELOG.md | 5 +++++ integrations/terraform-cloud/client.py | 1 + integrations/terraform-cloud/pyproject.toml | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index c900d6397a..619056a241 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## 0.1.83 (2024-11-25) + +### Improvements + +- Adds rate limit handling to the client ## 0.1.82 (2024-11-25) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 2a632213cc..89732dadc3 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -61,6 +61,7 @@ async def send_api_request( ) response.raise_for_status() + logger.debug(f"Successfully retrieved data for endpoint: {endpoint}") return response.json() except Exception as e: diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index 247a71418a..f2afd5b1bf 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.82" +version = "0.1.83" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "] From 7537a1d680959002ce5f241eea61a33d6fa4eee4 Mon Sep 17 00:00:00 2001 From: hpal Date: Tue, 26 Nov 2024 08:13:23 +0000 Subject: [PATCH 26/28] Add detailed log for exceptions --- integrations/terraform-cloud/CHANGELOG.md | 2 +- integrations/terraform-cloud/client.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index 619056a241..ba03db958e 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -6,7 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## 0.1.83 (2024-11-25) +## 0.1.83 (2024-11-26) ### Improvements diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 89732dadc3..b49882d686 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -65,7 +65,14 @@ async def send_api_request( return response.json() except Exception as e: - logger.error(f"Request failed for {url}: {str(e)}") + logger.error( + "Request failed", + error=str(e), + url=url, + method=method, + params=query_params, + body=json_data, + ) raise async def get_paginated_resources( From b2d4a1d066e410848dc9b18f1bdec4c504cdcf53 Mon Sep 17 00:00:00 2001 From: hpal Date: Tue, 26 Nov 2024 14:19:21 +0000 Subject: [PATCH 27/28] Revert changelog --- integrations/terraform-cloud/CHANGELOG.md | 5 ----- integrations/terraform-cloud/pyproject.toml | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index ba03db958e..c900d6397a 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -6,11 +6,6 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## 0.1.83 (2024-11-26) - -### Improvements - -- Adds rate limit handling to the client ## 0.1.82 (2024-11-25) diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index f2afd5b1bf..247a71418a 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.83" +version = "0.1.82" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "] From 34e164f3bdb72bcd284e6fc54cabb88417d4d865 Mon Sep 17 00:00:00 2001 From: hpal Date: Tue, 26 Nov 2024 14:23:18 +0000 Subject: [PATCH 28/28] Add back removed title --- integrations/terraform-cloud/CHANGELOG.md | 8 ++++++++ integrations/terraform-cloud/pyproject.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index c900d6397a..6dbb3911d1 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.83 (2024-11-26) + +### Improvements + +- Added rate limit handling to the client + ## 0.1.82 (2024-11-25) @@ -17,6 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 0.1.81 (2024-11-25) +### Improvements + - Bumped ocean version to ^0.14.2 ## 0.1.80 (2024-11-21) diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index 247a71418a..f2afd5b1bf 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.82" +version = "0.1.83" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "]