diff --git a/integrations/terraform-cloud/CHANGELOG.md b/integrations/terraform-cloud/CHANGELOG.md index a9fcb35eb2..6dbb3911d1 100644 --- a/integrations/terraform-cloud/CHANGELOG.md +++ b/integrations/terraform-cloud/CHANGELOG.md @@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.1.83 (2024-11-26) + +### Improvements + +- Added rate limit handling to the client + ## 0.1.82 (2024-11-25) @@ -17,12 +23,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## 0.1.81 (2024-11-25) - ### Improvements - Bumped ocean version to ^0.14.2 - ## 0.1.80 (2024-11-21) diff --git a/integrations/terraform-cloud/client.py b/integrations/terraform-cloud/client.py index 7c50a41894..b49882d686 100644 --- a/integrations/terraform-cloud/client.py +++ b/integrations/terraform-cloud/client.py @@ -1,10 +1,10 @@ +from enum import StrEnum from typing import Any, AsyncGenerator, Optional -from port_ocean.utils import http_async_client -import httpx +from aiolimiter import AsyncLimiter from loguru import logger -from enum import StrEnum from port_ocean.context.event import event +from port_ocean.utils import http_async_client TERRAFORM_WEBHOOK_EVENTS = [ "run:applying", @@ -21,6 +21,8 @@ class CacheKeys(StrEnum): PAGE_SIZE = 100 +NUMBER_OF_REQUESTS = 25 +NUMBER_OF_SECONDS = 1 class TerraformClient: @@ -34,6 +36,8 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None: self.client = http_async_client self.client.headers.update(self.base_headers) + self.rate_limiter = AsyncLimiter(NUMBER_OF_REQUESTS, NUMBER_OF_SECONDS) + async def send_api_request( self, endpoint: str, @@ -41,32 +45,35 @@ async def send_api_request( query_params: Optional[dict[str, Any]] = None, json_data: Optional[dict[str, Any]] = None, ) -> dict[str, Any]: - logger.info(f"Requesting Terraform Cloud data for endpoint: {endpoint}") + url = f"{self.api_url}/{endpoint}" + try: - url = f"{self.api_url}/{endpoint}" - logger.info( - f"URL: {url}, Method: {method}, Params: {query_params}, Body: {json_data}" - ) - response = await self.client.request( - method=method, - url=url, - params=query_params, - json=json_data, - ) - response.raise_for_status() + async with self.rate_limiter: + logger.debug( + f"Requesting {method} {url} with params: {query_params} and body: {json_data}" + ) - logger.info(f"Successfully retrieved data for endpoint: {endpoint}") + response = await self.client.request( + method=method, + url=url, + params=query_params, + json=json_data, + ) - return response.json() + response.raise_for_status() + logger.debug(f"Successfully retrieved data for endpoint: {endpoint}") + return response.json() - except httpx.HTTPStatusError as e: + except Exception as e: logger.error( - f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}" + "Request failed", + error=str(e), + url=url, + method=method, + params=query_params, + body=json_data, ) raise - except httpx.HTTPError as e: - logger.error(f"HTTP error on {endpoint}: {str(e)}") - raise async def get_paginated_resources( self, endpoint: str, params: Optional[dict[str, Any]] = None diff --git a/integrations/terraform-cloud/main.py b/integrations/terraform-cloud/main.py index 406582ace7..f59a76ea5b 100644 --- a/integrations/terraform-cloud/main.py +++ b/integrations/terraform-cloud/main.py @@ -1,8 +1,7 @@ -import asyncio from asyncio import gather +import asyncio from enum import StrEnum -from typing import Any, List, Dict - +from typing import Any, List from loguru import logger from client import TerraformClient @@ -19,7 +18,6 @@ class ObjectKind(StrEnum): SKIP_WEBHOOK_CREATION = False -SEMAPHORE_LIMIT = 30 def init_terraform_client() -> TerraformClient: @@ -40,51 +38,26 @@ def init_terraform_client() -> TerraformClient: async def enrich_state_versions_with_output_data( http_client: TerraformClient, state_versions: List[dict[str, Any]] ) -> list[dict[str, Any]]: - async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT): - tasks = [ - http_client.get_state_version_output(state_version["id"]) - for state_version in state_versions - ] - - output_batches = [] - for completed_task in asyncio.as_completed(tasks): - output = await completed_task - output_batches.append(output) - - enriched_state_versions = [ - {**state_version, "__output": output} - for state_version, output in zip(state_versions, output_batches) - ] + async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]: + try: + output = await http_client.get_state_version_output(state_version["id"]) + return {**state_version, "__output": output} + except Exception as e: + logger.warning( + f"Failed to fetch output for state version {state_version['id']}: {e}" + ) + return {**state_version, "__output": {}} - return enriched_state_versions + enriched_versions = await gather( + *[fetch_output(state_version) for state_version in state_versions] + ) + return list(enriched_versions) async def enrich_workspaces_with_tags( http_client: TerraformClient, workspaces: List[dict[str, Any]] ) -> list[dict[str, Any]]: async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]: - async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT): - try: - tags = [] - async for tag_batch in http_client.get_workspace_tags(workspace["id"]): - tags.extend(tag_batch) - return {**workspace, "__tags": tags} - except Exception as e: - logger.warning( - f"Failed to fetch tags for workspace {workspace['id']}: {e}" - ) - return {**workspace, "__tags": []} - - tasks = [get_tags_for_workspace(workspace) for workspace in workspaces] - enriched_workspaces = [await task for task in asyncio.as_completed(tasks)] - - return enriched_workspaces - - -async def enrich_workspace_with_tags( - http_client: TerraformClient, workspace: dict[str, Any] -) -> dict[str, Any]: - async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT): try: tags = [] async for tag_batch in http_client.get_workspace_tags(workspace["id"]): @@ -94,6 +67,24 @@ async def enrich_workspace_with_tags( logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}") return {**workspace, "__tags": []} + enriched_workspaces = await gather( + *[get_tags_for_workspace(workspace) for workspace in workspaces] + ) + return list(enriched_workspaces) + + +async def enrich_workspace_with_tags( + http_client: TerraformClient, workspace: dict[str, Any] +) -> dict[str, Any]: + try: + tags = [] + async for tag_batch in http_client.get_workspace_tags(workspace["id"]): + tags.extend(tag_batch) + return {**workspace, "__tags": tags} + except Exception as e: + logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}") + return {**workspace, "__tags": []} + @ocean.on_resync(ObjectKind.ORGANIZATION) async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @@ -125,32 +116,30 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(ObjectKind.RUN) async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: terraform_client = init_terraform_client() + BATCH_SIZE = 25 # Stay safely under 30 req/sec limit - async def fetch_runs_for_workspace( - workspace: dict[str, Any] - ) -> List[List[Dict[str, Any]]]: - return [ - run - async for run in terraform_client.get_paginated_runs_for_workspace( - workspace["id"] - ) - ] + async def process_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]: + runs = [] + async for run_batch in terraform_client.get_paginated_runs_for_workspace( + workspace["id"] + ): + if run_batch: + runs.extend(run_batch) + return runs - async def fetch_runs_for_all_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE: - async for workspaces in terraform_client.get_paginated_workspaces(): - logger.info( - f"Received {len(workspaces)} batch workspaces... fetching its associated {kind}" - ) + async for workspaces in terraform_client.get_paginated_workspaces(): + logger.info(f"Processing batch of {len(workspaces)} workspaces") + + # Process in batches to stay under rate limit + for i in range(0, len(workspaces), BATCH_SIZE): + batch = workspaces[i : i + BATCH_SIZE] + tasks = [process_workspace(workspace) for workspace in batch] - tasks = [fetch_runs_for_workspace(workspace) for workspace in workspaces] for completed_task in asyncio.as_completed(tasks): - workspace_runs = await completed_task - for runs in workspace_runs: + runs = await completed_task + if runs: yield runs - async for run_batch in fetch_runs_for_all_workspaces(): - yield run_batch - @ocean.on_resync(ObjectKind.STATE_VERSION) async def resync_state_versions(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: diff --git a/integrations/terraform-cloud/poetry.lock b/integrations/terraform-cloud/poetry.lock index 4fd4015b90..e087faa12f 100644 --- a/integrations/terraform-cloud/poetry.lock +++ b/integrations/terraform-cloud/poetry.lock @@ -1,5 +1,16 @@ # This file is automatically @generated by Poetry 1.8.4 and should not be changed by hand. +[[package]] +name = "aiolimiter" +version = "1.1.0" +description = "asyncio rate limiter, a leaky bucket implementation" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "aiolimiter-1.1.0-py3-none-any.whl", hash = "sha256:0b4997961fc58b8df40279e739f9cf0d3e255e63e9a44f64df567a8c17241e24"}, + {file = "aiolimiter-1.1.0.tar.gz", hash = "sha256:461cf02f82a29347340d031626c92853645c099cb5ff85577b831a7bd21132b5"}, +] + [[package]] name = "aiostream" version = "0.6.4" @@ -1660,4 +1671,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.12" -content-hash = "2cb72bcdbd016cc3a80717d66a23b2ed853a5bfa87268e17f6d32a8ee1b187b4" +content-hash = "9e18a6343103e1c0c6f3b38965d09ddc4ebfee6d3741f355ec1de7a5fe4810f0" diff --git a/integrations/terraform-cloud/pyproject.toml b/integrations/terraform-cloud/pyproject.toml index 9558e5e9bb..f2afd5b1bf 100644 --- a/integrations/terraform-cloud/pyproject.toml +++ b/integrations/terraform-cloud/pyproject.toml @@ -1,12 +1,13 @@ [tool.poetry] name = "terraform-cloud" -version = "0.1.82" +version = "0.1.83" description = "Terraform Cloud Integration for Port" authors = ["Michael Armah "] [tool.poetry.dependencies] python = "^3.12" port_ocean = {version = "^0.14.3", extras = ["cli"]} +aiolimiter = "^1.1.0" [tool.poetry.group.dev.dependencies] # uncomment this if you want to debug the ocean core together with your integration