Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration][Terraform-Cloud] Add Rate Limiting Improvements #1110

Merged
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
0b52b8f
[terraform-cloud] base rate limiting with aiolimiter
phalbert Oct 30, 2024
a00b093
update the var names
phalbert Oct 30, 2024
977420b
user process_in_queue to control concurrent operations at the applica…
phalbert Oct 30, 2024
104caa6
Update main.py
phalbert Oct 30, 2024
75412c4
add token bucket; add comments why
phalbert Oct 31, 2024
81ea493
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Oct 31, 2024
28f50d3
simplify impl
phalbert Oct 31, 2024
34849d1
Update main.py
phalbert Oct 31, 2024
e32a608
Update integrations/terraform-cloud/client.py
Tankilevitch Oct 31, 2024
fd6e74c
Remove unncessary code
phalbert Nov 1, 2024
a0912e7
Add concurrent limiti to runs due to timeouts
phalbert Nov 1, 2024
c9ef642
Merge branch 'PORT-10857-adhere-to-terraform-cloud-api-rate-limits' o…
phalbert Nov 1, 2024
01a0e47
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 2, 2024
e26fbad
Update main.py
phalbert Nov 4, 2024
fa813ca
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 4, 2024
f232a7e
fix liniting
phalbert Nov 4, 2024
0c9d209
Add changelog
phalbert Nov 4, 2024
d216749
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
mk-armah Nov 5, 2024
ceb3a9a
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 6, 2024
349424f
Update poetry.lock
phalbert Nov 6, 2024
0baa218
Remove extra retry logic
phalbert Nov 6, 2024
7ed7220
Bump version
phalbert Nov 7, 2024
d8e1103
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 7, 2024
6a37db3
refactor(terraform): optimize run sync with controlled concurrency
phalbert Nov 21, 2024
38dbf5d
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
899e628
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
083264e
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
348c95f
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 21, 2024
d38581e
Update main.py
phalbert Nov 21, 2024
d643e8a
[Terraform] Reinstate exception handling in client
phalbert Nov 22, 2024
591ee15
Update the logging and add constants
phalbert Nov 25, 2024
ceaafb6
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 25, 2024
2f57501
Bump version
phalbert Nov 25, 2024
0fc7ffe
Update client.py
phalbert Nov 25, 2024
cd737bd
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 25, 2024
710f508
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
phalbert Nov 25, 2024
61b7f8b
Update poetry.lock
phalbert Nov 25, 2024
e081438
fix: remove redundant rate limit handling
phalbert Nov 25, 2024
9dc7fba
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
Tankilevitch Nov 25, 2024
1723360
Update changelog
phalbert Nov 25, 2024
b22e709
Merge branch 'PORT-10857-adhere-to-terraform-cloud-api-rate-limits' o…
phalbert Nov 25, 2024
7537a1d
Add detailed log for exceptions
phalbert Nov 26, 2024
b2d4a1d
Revert changelog
phalbert Nov 26, 2024
34e164f
Add back removed title
phalbert Nov 26, 2024
9c85134
Merge branch 'main' into PORT-10857-adhere-to-terraform-cloud-api-rat…
Tankilevitch Nov 26, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 65 additions & 26 deletions integrations/terraform-cloud/client.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
from typing import Any, AsyncGenerator, Optional
from port_ocean.utils import http_async_client
import asyncio
import time
import httpx
from loguru import logger

from enum import StrEnum
from typing import Any, AsyncGenerator, Optional
from aiolimiter import AsyncLimiter
from loguru import logger

from port_ocean.context.event import event
from port_ocean.utils import http_async_client

# Constants
Tankilevitch marked this conversation as resolved.
Show resolved Hide resolved
TERRAFORM_WEBHOOK_EVENTS = [
"run:applying",
"run:completed",
Expand All @@ -21,6 +26,7 @@ class CacheKeys(StrEnum):


PAGE_SIZE = 100
TERRAFORM_RATE_LIMIT = 25 # Buffer below 30/sec limit


class TerraformClient:
Expand All @@ -34,39 +40,72 @@ def __init__(self, terraform_base_url: str, auth_token: str) -> None:
self.client = http_async_client
self.client.headers.update(self.base_headers)

self.rate_limiter = AsyncLimiter(TERRAFORM_RATE_LIMIT, 1)
phalbert marked this conversation as resolved.
Show resolved Hide resolved
self._remaining = TERRAFORM_RATE_LIMIT
self._reset_time = time.time()

async def send_api_request(
self,
endpoint: str,
method: str = "GET",
query_params: Optional[dict[str, Any]] = None,
json_data: Optional[dict[str, Any]] = None,
) -> dict[str, Any]:
logger.info(f"Requesting Terraform Cloud data for endpoint: {endpoint}")
try:
phalbert marked this conversation as resolved.
Show resolved Hide resolved
url = f"{self.api_url}/{endpoint}"
logger.info(
f"URL: {url}, Method: {method}, Params: {query_params}, Body: {json_data}"
)
phalbert marked this conversation as resolved.
Show resolved Hide resolved
response = await self.client.request(
method=method,
url=url,
params=query_params,
json=json_data,
)
response.raise_for_status()
async with self.rate_limiter:
try:
url = f"{self.api_url}/{endpoint}"
response = await self.client.request(
method=method,
url=url,
params=query_params,
json=json_data,
)

logger.info(f"Successfully retrieved data for endpoint: {endpoint}")
phalbert marked this conversation as resolved.
Show resolved Hide resolved
phalbert marked this conversation as resolved.
Show resolved Hide resolved
self._remaining = int(
response.headers.get("x-ratelimit-remaining", TERRAFORM_RATE_LIMIT)
)
reset_in = float(
response.headers.get("x-ratelimit-reset", 1)
) # Default to 1 second
self._reset_time = time.time() + reset_in
phalbert marked this conversation as resolved.
Show resolved Hide resolved

return response.json()
logger.info(
f"Rate limit info - "
f"Limit: {response.headers.get('x-ratelimit-limit', TERRAFORM_RATE_LIMIT)}, "
f"Remaining: {self._remaining}, "
f"Reset: {reset_in}"
)

except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}"
)
raise
except httpx.HTTPError as e:
logger.error(f"HTTP error on {endpoint}: {str(e)}")
raise
if response.status_code == 429:
logger.warning(
f"Rate limited on {endpoint}. "
f"Headers: {dict(response.headers)}. "
f"Waiting {reset_in} seconds"
)
await asyncio.sleep(reset_in)
return await self.send_api_request(
endpoint, method, query_params, json_data
)

response.raise_for_status()
return response.json()

except httpx.HTTPStatusError as e:
if e.response.status_code == 429:
reset_in = float(e.response.headers.get("x-ratelimit-reset", 1))
logger.warning(
f"Rate limited on {endpoint}. "
f"Headers: {dict(e.response.headers)}. "
f"Waiting {reset_in} seconds"
)
await asyncio.sleep(reset_in)
return await self.send_api_request(
endpoint, method, query_params, json_data
)
logger.error(
f"HTTP error on {endpoint}: {e.response.status_code} - {e.response.text}"
)
raise

async def get_paginated_resources(
self, endpoint: str, params: Optional[dict[str, Any]] = None
Expand Down
125 changes: 65 additions & 60 deletions integrations/terraform-cloud/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import asyncio
import functools
from asyncio import gather
from enum import StrEnum
from typing import Any, List, Dict

from typing import Any, List
from loguru import logger

from client import TerraformClient
from port_ocean.context.ocean import ocean
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE, RAW_RESULT
from port_ocean.utils.queue_utils import process_in_queue
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)


class ObjectKind(StrEnum):
Expand All @@ -19,7 +24,10 @@ class ObjectKind(StrEnum):


SKIP_WEBHOOK_CREATION = False
SEMAPHORE_LIMIT = 30

# Constants for rate limiting
MAX_CONCURRENCY = 25 # Keep slightly under the 30/sec limit to allow for overhead
semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENCY)
phalbert marked this conversation as resolved.
Show resolved Hide resolved


def init_terraform_client() -> TerraformClient:
Expand All @@ -40,51 +48,27 @@ def init_terraform_client() -> TerraformClient:
async def enrich_state_versions_with_output_data(
http_client: TerraformClient, state_versions: List[dict[str, Any]]
) -> list[dict[str, Any]]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
tasks = [
http_client.get_state_version_output(state_version["id"])
for state_version in state_versions
]

output_batches = []
for completed_task in asyncio.as_completed(tasks):
output = await completed_task
output_batches.append(output)
async def fetch_output(state_version: dict[str, Any]) -> dict[str, Any]:
try:
output = await http_client.get_state_version_output(state_version["id"])
return {**state_version, "__output": output}
except Exception as e:
logger.warning(
f"Failed to fetch output for state version {state_version['id']}: {e}"
)
return {**state_version, "__output": {}}

enriched_state_versions = [
{**state_version, "__output": output}
for state_version, output in zip(state_versions, output_batches)
]
enriched_versions = await process_in_queue(
state_versions, fetch_output, concurrency=MAX_CONCURRENCY
)

return enriched_state_versions
return enriched_versions


async def enrich_workspaces_with_tags(
http_client: TerraformClient, workspaces: List[dict[str, Any]]
) -> list[dict[str, Any]]:
async def get_tags_for_workspace(workspace: dict[str, Any]) -> dict[str, Any]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
tags.extend(tag_batch)
return {**workspace, "__tags": tags}
except Exception as e:
logger.warning(
f"Failed to fetch tags for workspace {workspace['id']}: {e}"
)
return {**workspace, "__tags": []}

tasks = [get_tags_for_workspace(workspace) for workspace in workspaces]
enriched_workspaces = [await task for task in asyncio.as_completed(tasks)]

return enriched_workspaces


async def enrich_workspace_with_tags(
http_client: TerraformClient, workspace: dict[str, Any]
) -> dict[str, Any]:
async with asyncio.BoundedSemaphore(SEMAPHORE_LIMIT):
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
Expand All @@ -94,6 +78,25 @@ async def enrich_workspace_with_tags(
logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}")
return {**workspace, "__tags": []}

enriched_workspaces = await process_in_queue(
workspaces, get_tags_for_workspace, concurrency=MAX_CONCURRENCY
)

return enriched_workspaces


async def enrich_workspace_with_tags(
http_client: TerraformClient, workspace: dict[str, Any]
) -> dict[str, Any]:
try:
tags = []
async for tag_batch in http_client.get_workspace_tags(workspace["id"]):
tags.extend(tag_batch)
return {**workspace, "__tags": tags}
except Exception as e:
logger.warning(f"Failed to fetch tags for workspace {workspace['id']}: {e}")
return {**workspace, "__tags": []}


@ocean.on_resync(ObjectKind.ORGANIZATION)
async def resync_organizations(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
Expand Down Expand Up @@ -127,29 +130,31 @@ async def resync_runs(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
terraform_client = init_terraform_client()

async def fetch_runs_for_workspace(
workspace: dict[str, Any]
) -> List[List[Dict[str, Any]]]:
return [
run
async for run in terraform_client.get_paginated_runs_for_workspace(
workspace["id"]
)
]

async def fetch_runs_for_all_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE:
workspace_id: str,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
async for runs in terraform_client.get_paginated_runs_for_workspace(
workspace_id
):
yield runs

async def process_workspaces() -> ASYNC_GENERATOR_RESYNC_TYPE:
async for workspaces in terraform_client.get_paginated_workspaces():
logger.info(
f"Received {len(workspaces)} batch workspaces... fetching its associated {kind}"
)
logger.info(f"Getting {kind}s for {len(workspaces)} workspaces")

tasks = [
semaphore_async_iterator(
semaphore,
functools.partial(fetch_runs_for_workspace, workspace["id"]),
)
for workspace in workspaces
]

tasks = [fetch_runs_for_workspace(workspace) for workspace in workspaces]
for completed_task in asyncio.as_completed(tasks):
workspace_runs = await completed_task
for runs in workspace_runs:
yield runs
if tasks:
async for run_batch in stream_async_iterators_tasks(*tasks):
yield run_batch

async for run_batch in fetch_runs_for_all_workspaces():
yield run_batch
async for runs in process_workspaces():
yield runs


@ocean.on_resync(ObjectKind.STATE_VERSION)
Expand Down
15 changes: 13 additions & 2 deletions integrations/terraform-cloud/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions integrations/terraform-cloud/pyproject.toml
phalbert marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ authors = ["Michael Armah <[email protected]>"]
[tool.poetry.dependencies]
python = "^3.11"
port_ocean = {version = "^0.12.7", extras = ["cli"]}
aiolimiter = "^1.1.0"

[tool.poetry.group.dev.dependencies]
# uncomment this if you want to debug the ocean core together with your integration
Expand Down