Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Terraform-Cloud] Add Rate Limiting Improvements #1110

Merged
Merged
Show file tree
Hide file tree
Changes from 30 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0b52b8f
[terraform-cloud] base rate limiting with aiolimiter
phalbert Oct 30, 2024
a00b093
update the var names
phalbert Oct 30, 2024
977420b
user process_in_queue to control concurrent operations at the applica…
phalbert Oct 30, 2024
104caa6
Update main.py
phalbert Oct 30, 2024
75412c4
add token bucket; add comments why
phalbert Oct 31, 2024
81ea493
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Oct 31, 2024
28f50d3
simplify impl
phalbert Oct 31, 2024
34849d1
Update main.py
phalbert Oct 31, 2024
e32a608
Update integrations/terraform-cloud/client.py
Tankilevitch Oct 31, 2024
fd6e74c
Remove unncessary code
phalbert Nov 1, 2024
a0912e7
Add concurrent limiti to runs due to timeouts
phalbert Nov 1, 2024
c9ef642
Merge branch 'PORT-10857-adhere-to-terraform-cloud-api-rate-limits' o…
phalbert Nov 1, 2024
01a0e47
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 2, 2024
e26fbad
Update main.py
phalbert Nov 4, 2024
fa813ca
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 4, 2024
f232a7e
fix liniting
phalbert Nov 4, 2024
0c9d209
Add changelog
phalbert Nov 4, 2024
d216749
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
mk-armah Nov 5, 2024
ceb3a9a
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 6, 2024
349424f
Update poetry.lock
phalbert Nov 6, 2024
0baa218
Remove extra retry logic
phalbert Nov 6, 2024
7ed7220
Bump version
phalbert Nov 7, 2024
d8e1103
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 7, 2024
6a37db3
refactor(terraform): optimize run sync with controlled concurrency
phalbert Nov 21, 2024
38dbf5d
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
899e628
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
083264e
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
348c95f
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
d38581e
Update main.py
phalbert Nov 21, 2024
d643e8a
[Terraform] Reinstate exception handling in client
phalbert Nov 22, 2024
591ee15
Update the logging and add constants
phalbert Nov 25, 2024
ceaafb6
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 25, 2024
2f57501
Bump version
phalbert Nov 25, 2024
0fc7ffe
Update client.py
phalbert Nov 25, 2024
cd737bd
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 25, 2024
710f508
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 25, 2024
61b7f8b
Update poetry.lock
phalbert Nov 25, 2024
e081438
fix: remove redundant rate limit handling
phalbert Nov 25, 2024
9dc7fba
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
Tankilevitch Nov 25, 2024
1723360
Update changelog
phalbert Nov 25, 2024
b22e709
Merge branch 'PORT-10857-adhere-to-terraform-cloud-api-rate-limits' o…
phalbert Nov 25, 2024
7537a1d
Add detailed log for exceptions
phalbert Nov 26, 2024
b2d4a1d
Revert changelog
phalbert Nov 26, 2024
34e164f
Add back removed title
phalbert Nov 26, 2024
9c85134
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
Tankilevitch Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integrations/terraform-cloud/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

<!-- towncrier release notes start -->
## 0.1.76 (2024-11-07)

### Improvements

- Implemented Terraform API rate limiting to prevent resync failures that caused stale data in Port. Previously, exceeding the API's request limit interrupted resyncs, preventing the deletion of outdated entities.

## 0.1.80 (2024-11-21)

Expand Down
75 changes: 53 additions & 22 deletions integrations/terraform-cloud/client.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from enum import StrEnum
from typing import Any, AsyncGenerator, Optional
from port_ocean.utils import http_async_client
import httpx
from aiolimiter import AsyncLimiter
from loguru import logger
from enum import StrEnum
import asyncio
import random
import httpx

from port_ocean.context.event import event
from port_ocean.utils import http_async_client

TERRAFORM_WEBHOOK_EVENTS = [
"run:applying",
Expand All @@ -21,6 +24,8 @@ class CacheKeys(StrEnum):


PAGE_SIZE = 100
NUMBER_OF_REQUESTS = 25
NUMBER_OF_SECONDS = 1


class TerraformClient:
Expand All @@ -34,38 +39,64 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None:
self.client = http_async_client
self.client.headers.update(self.base_headers)

self.rate_limiter = AsyncLimiter(NUMBER_OF_REQUESTS, NUMBER_OF_SECONDS)

async def send_api_request(
self,
endpoint: str,
method: str = "GET",
query_params: Optional[dict[str, Any]] = None,
json_data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
logger.info(f"Requesting Terraform Cloud data for endpoint: {endpoint}")
url = f"{self.api_url}/{endpoint}"

try:
phalbert marked this conversation as resolved.
Show resolved Hide resolved
url = f"{self.api_url}/{endpoint}"
logger.info(
f"URL: {url}, Method: {method}, Params: {query_params}, Body: {json_data}"
)
phalbert marked this conversation as resolved.
Show resolved Hide resolved
response = await self.client.request(
method=method,
url=url,
params=query_params,
json=json_data,
)
response.raise_for_status()
async with self.rate_limiter:
logger.debug(
f"Requesting {method} {url} with params: {query_params} and body: {json_data}"
)

response = await self.client.request(
method=method,
url=url,
params=query_params,
json=json_data,
)

logger.info(f"Successfully retrieved data for endpoint: {endpoint}")
phalbert marked this conversation as resolved.
Show resolved Hide resolved
phalbert marked this conversation as resolved.
Show resolved Hide resolved
# Extract rate limit info from headers
rate_limit = response.headers.get("x-ratelimit-limit")
rate_limit_remaining = response.headers.get("x-ratelimit-remaining")
rate_limit_reset = response.headers.get("x-ratelimit-reset")

return response.json()
logger.debug(
f"Rate Limit: {rate_limit}, "
f"Remaining: {rate_limit_remaining}, "
f"Reset in: {rate_limit_reset} seconds"
)
phalbert marked this conversation as resolved.
Show resolved Hide resolved

response.raise_for_status()

pagination_meta = response.json().get("meta", {}).get("pagination", {})
logger.debug(
f"Successfully fetched {endpoint} with pagination info: {pagination_meta}"
)

return response.json()

except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}"
)
if e.response.status_code == 429:
rate_limit_reset = e.response.headers.get("x-ratelimit-reset")
reset_time = float(rate_limit_reset or 1.0)
wait_time = reset_time + random.uniform(0, 1) # Add jitter
phalbert marked this conversation as resolved.
Show resolved Hide resolved
logger.warning(
f"Rate limit exceeded. Waiting for {wait_time:.2f} seconds"
)
phalbert marked this conversation as resolved.
Show resolved Hide resolved
await asyncio.sleep(wait_time)
logger.error(f"HTTP error for {url}: {str(e)}")
raise
except httpx.HTTPError as e:
logger.error(f"HTTP error on {endpoint}: {str(e)}")

except Exception as e:
logger.error(f"Request failed for {url}: {str(e)}")
raise

async def get_paginated_resources(
Expand Down
113 changes: 51 additions & 62 deletions integrations/terraform-cloud/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
import asyncio
from asyncio import gather
import asyncio
from enum import StrEnum
from typing import Any, List, Dict

from typing import Any, List
from loguru import logger

from client import TerraformClient
Expand All @@ -19,7 +18,6 @@ class ObjectKind(StrEnum):


SKIP_WEBHOOK_CREATION = False
SEMAPHORE_LIMIT = 30


def init_terraform_client() -> TerraformClient:
Expand All @@ -40,51 +38,26 @@ def init_terraform_client() -> TerraformClient:
async def enrich_state_versions_with_output_data(
http_client: TerraformClient, state_versions: List[dict[str, Any]]
) -> list[dict[str, Any]]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
tasks = [
http_client.get_state_version_output(state_version["id"])
for state_version in state_versions
]

output_batches = []
for completed_task in asyncio.as_completed(tasks):
output = await completed_task
output_batches.append(output)

enriched_state_versions = [
{**state_version, "__output": output}
for state_version, output in zip(state_versions, output_batches)
]
async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]:
try:
output = await http_client.get_state_version_output(state_version["id"])
return {**state_version, "__output": output}
except Exception as e:
logger.warning(
f"Failed to fetch output for state version {state_version['id']}: {e}"
)
return {**state_version, "__output": {}}

return enriched_state_versions
enriched_versions = await gather(
*[fetch_output(state_version) for state_version in state_versions]
)
return list(enriched_versions)


async def enrich_workspaces_with_tags(
http_client: TerraformClient, workspaces: List[dict[str, Any]]
) -> list[dict[str, Any]]:
async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
tags.extend(tag_batch)
return {**workspace, "__tags": tags}
except Exception as e:
logger.warning(
f"Failed to fetch tags for workspace {workspace['id']}: {e}"
)
return {**workspace, "__tags": []}

tasks = [get_tags_for_workspace(workspace) for workspace in workspaces]
enriched_workspaces = [await task for task in asyncio.as_completed(tasks)]

return enriched_workspaces


async def enrich_workspace_with_tags(
http_client: TerraformClient, workspace: dict[str, Any]
) -> dict[str, Any]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
Expand All @@ -94,6 +67,24 @@ async def enrich_workspace_with_tags(
logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}")
return {**workspace, "__tags": []}

enriched_workspaces = await gather(
*[get_tags_for_workspace(workspace) for workspace in workspaces]
)
return list(enriched_workspaces)


async def enrich_workspace_with_tags(
http_client: TerraformClient, workspace: dict[str, Any]
) -> dict[str, Any]:
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
tags.extend(tag_batch)
return {**workspace, "__tags": tags}
except Exception as e:
logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}")
return {**workspace, "__tags": []}


@ocean.on_resync(ObjectKind.ORGANIZATION)
async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
Expand Down Expand Up @@ -125,32 +116,30 @@ async def resync_workspaces(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(ObjectKind.RUN)
async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
terraform_client = init_terraform_client()
BATCH_SIZE = 25 # Stay safely under 30 req/sec limit

async def fetch_runs_for_workspace(
workspace: dict[str, Any]
) -> List[List[Dict[str, Any]]]:
return [
run
async for run in terraform_client.get_paginated_runs_for_workspace(
workspace["id"]
)
]
async def process_workspace(workspace: dict[str, Any]) -> List[dict[str, Any]]:
runs = []
async for run_batch in terraform_client.get_paginated_runs_for_workspace(
workspace["id"]
):
if run_batch:
runs.extend(run_batch)
return runs

async def fetch_runs_for_all_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE:
async for workspaces in terraform_client.get_paginated_workspaces():
logger.info(
f"Received {len(workspaces)} batch workspaces... fetching its associated {kind}"
)
async for workspaces in terraform_client.get_paginated_workspaces():
logger.info(f"Processing batch of {len(workspaces)} workspaces")

# Process in batches to stay under rate limit
for i in range(0, len(workspaces), BATCH_SIZE):
batch = workspaces[i : i + BATCH_SIZE]
tasks = [process_workspace(workspace) for workspace in batch]

tasks = [fetch_runs_for_workspace(workspace) for workspace in workspaces]
for completed_task in asyncio.as_completed(tasks):
workspace_runs = await completed_task
for runs in workspace_runs:
runs = await completed_task
if runs:
yield runs

async for run_batch in fetch_runs_for_all_workspaces():
yield run_batch


@ocean.on_resync(ObjectKind.STATE_VERSION)
async def resync_state_versions(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
Expand Down
13 changes: 12 additions & 1 deletion integrations/terraform-cloud/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integrations/terraform-cloud/pyproject.toml
phalbert marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["Michael Armah <[email protected]>"]
[tool.poetry.dependencies]
python = "^3.12"
port_ocean = {version = "^0.14.1", extras = ["cli"]}
aiolimiter = "^1.1.0"

[tool.poetry.group.dev.dependencies]
# uncomment this if you want to debug the ocean core together with your integration
Expand Down