Skip to content

Commit

Permalink
[GCP] Add quota dependencies (#984)
Browse files Browse the repository at this point in the history
# Description

What - Fixed quota library not existing + got some environment data from
using environment variables

Why - Enable easier Terraform experience 

How - added environment variables checking if configuration from user
wasn't sent

## Type of change

Please leave one option from the following and delete the rest:

- [x] Bug fix (non-breaking change which fixes an issue)
- [ ] New feature (non-breaking change which adds functionality)
- [ ] New Integration (non-breaking change which adds a new integration)
- [ ] Breaking change (fix or feature that would cause existing
functionality to not work as expected)
- [ ] Non-breaking change (fix of existing functionality that will not
change current behavior)
- [ ] Documentation (added/updated documentation)

<h4> All tests should be run against the port production
environment(using a testing org). </h4>


### Integration testing checklist

- [x] Integration able to create all default resources from scratch
- [x] Resync able to create entities
- [x] Resync able to update entities
- [ ] Resync able to detect and delete entities
- [x] Resync finishes successfully
- [ ] If new resource kind is added or updated in the integration, add
example raw data, mapping and expected result to the `examples` folder
in the integration directory.
- [ ] If resource kind is updated, run the integration with the example
data and check if the expected result is achieved
- [ ] If new resource kind is added or updated, validate that
live-events for that resource are working as expected
- [ ] Docs PR link [here](#)
  • Loading branch information
matan84 authored Sep 4, 2024
1 parent 4919d9b commit eb6ac81
Show file tree
Hide file tree
Showing 4 changed files with 112 additions and 27 deletions.
15 changes: 14 additions & 1 deletion integrations/gcp/gcp_core/helpers/ratelimiter/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any, Optional, TYPE_CHECKING, final

from aiolimiter import AsyncLimiter
from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest # type: ignore
from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest
from google.api_core.exceptions import GoogleAPICallError
from loguru import logger
from enum import Enum
Expand Down Expand Up @@ -116,6 +116,13 @@ class GCPResourceRateLimiter(GCPResourceQuota):

time_period: float = _DEFAULT_RATE_LIMIT_TIME_PERIOD

async def default_rate_limiter(self) -> AsyncLimiter:
quota = int(max(round(self._default_quota * _PERCENTAGE_OF_QUOTA, 1), 1))
logger.info(
f"Using default values: The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {quota} for rate limiting."
)
return AsyncLimiter(max_rate=quota, time_period=self.time_period)

@cache_coroutine_result()
async def register(self, container_id: str, *arg: Optional[Any]) -> AsyncLimiter:
quota = await self._get_quota(container_id, *arg)
Expand Down Expand Up @@ -147,6 +154,12 @@ class ResourceBoundedSemaphore(GCPResourceRateLimiter):

default_maximum_limit: int = MAXIMUM_CONCURRENT_REQUESTS

async def default_semaphore(self) -> asyncio.BoundedSemaphore:
logger.info(
f"The integration will process {self.default_maximum_limit} at a time based on the default maximum limit."
)
return asyncio.BoundedSemaphore(self.default_maximum_limit)

@cache_coroutine_result()
async def semaphore(
self, container_id: str, *args: Any
Expand Down
94 changes: 74 additions & 20 deletions integrations/gcp/gcp_core/utils.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
import enum
import base64
import os
import typing
from collections.abc import MutableSequence
from typing import Any, TypedDict, Tuple

from loguru import logger
import proto # type: ignore
from port_ocean.context.event import event
from port_ocean.core.handlers.port_app_config.models import ResourceConfig

from gcp_core.overrides import GCPCloudResourceConfig
from port_ocean.context.ocean import ocean
import json

from pathlib import Path
from gcp_core.helpers.ratelimiter.overrides import (
SearchAllResourcesQpmPerProject,
PubSubAdministratorPerMinutePerProject,
)

search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject()
pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject()

EXTRA_PROJECT_FIELD = "__project"
DEFAULT_CREDENTIALS_FILE_PATH = (
f"{Path.home()}/.config/gcloud/application_default_credentials.json"
)

if typing.TYPE_CHECKING:
from aiolimiter import AsyncLimiter
Expand Down Expand Up @@ -72,36 +78,84 @@ def get_current_resource_config() -> (


def get_credentials_json() -> str:
b64_credentials = ocean.integration_config["encoded_adc_configuration"]
credentials_json = base64.b64decode(b64_credentials).decode("utf-8")
credentials_json = ""
if ocean.integration_config.get("encoded_adc_configuration"):
b64_credentials = ocean.integration_config["encoded_adc_configuration"]
credentials_json = base64.b64decode(b64_credentials).decode("utf-8")
else:
try:
file_path: str = (
os.environ.get("GOOGLE_APPLICATION_CREDENTIALS")
or DEFAULT_CREDENTIALS_FILE_PATH
)
with open(file_path, "r", encoding="utf-8") as file:
credentials_json = file.read()
except FileNotFoundError as e:
raise FileNotFoundError(
f"Couldn't find the google credentials file. Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable properly. Error: {str(e)}"
)
return credentials_json


def get_service_account_project_id() -> str:
"get project id associated with service account"
default_credentials = json.loads(get_credentials_json())
project_id = default_credentials["project_id"]
return project_id
try:
default_credentials = json.loads(get_credentials_json())
project_id = default_credentials["quota_project_id"]
return project_id
except FileNotFoundError as e:
gcp_project_env = os.getenv("GCP_PROJECT")
if isinstance(gcp_project_env, str):
return gcp_project_env
else:
raise ValueError(
f"Couldn't figure out the service account's project id. You can specify it usign the GCP_PROJECT environment variable. Error: {str(e)}"
)
except KeyError as e:
raise ValueError(
f"Couldn't figure out the service account's project id. Key: {str(e)} doesn't exist in the credentials file."
)
except Exception as e:
raise ValueError(
f"Couldn't figure out the service account's project id. Error: {str(e)}"
)
raise ValueError("Couldn't figure out the service account's project id.")


async def resolve_request_controllers(
kind: str,
) -> Tuple["AsyncLimiter", "BoundedSemaphore"]:
service_account_project_id = get_service_account_project_id()

if kind == AssetTypesWithSpecialHandling.TOPIC:
topic_rate_limiter = await pubsub_administrator_per_minute_per_project.limiter(
try:
service_account_project_id = get_service_account_project_id()

if kind == AssetTypesWithSpecialHandling.TOPIC:
topic_rate_limiter = (
await pubsub_administrator_per_minute_per_project.limiter(
service_account_project_id
)
)
topic_semaphore = (
await pubsub_administrator_per_minute_per_project.semaphore(
service_account_project_id
)
)
return (topic_rate_limiter, topic_semaphore)

asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
service_account_project_id
)
topic_semaphore = await pubsub_administrator_per_minute_per_project.semaphore(
asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
service_account_project_id
)
return (topic_rate_limiter, topic_semaphore)

asset_rate_limiter = await search_all_resources_qpm_per_project.limiter(
service_account_project_id
)
asset_semaphore = await search_all_resources_qpm_per_project.semaphore(
service_account_project_id
)
return (asset_rate_limiter, asset_semaphore)
return (asset_rate_limiter, asset_semaphore)
except Exception as e:
logger.warning(
f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}"
)
default_rate_limiter = (
await search_all_resources_qpm_per_project.default_rate_limiter()
)
default_semaphore = (
await search_all_resources_qpm_per_project.default_semaphore()
)
return (default_rate_limiter, default_semaphore)
27 changes: 22 additions & 5 deletions integrations/gcp/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion integrations/gcp/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ port_ocean = {version = "^0.10.7", extras = ["cli"]}
google-cloud-asset = "^3.25.1"
google-cloud-pubsub = "^2.21.1"
google-cloud-resource-manager = "^1.12.3"
google-cloud-quotas = "^0.1.10"
aiolimiter = "^1.1.0"
types-requests = "^2.32.0.20240712"

Expand Down Expand Up @@ -115,4 +116,4 @@ exclude = '''
[tool.pytest.ini_options]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
addopts = "-vv -n auto ./tests"
addopts = "-vv -n auto ./tests"

0 comments on commit eb6ac81

Please sign in to comment.