diff --git a/integrations/gcp/gcp_core/helpers/ratelimiter/base.py b/integrations/gcp/gcp_core/helpers/ratelimiter/base.py index 147be4353c..e1d5c049e7 100644 --- a/integrations/gcp/gcp_core/helpers/ratelimiter/base.py +++ b/integrations/gcp/gcp_core/helpers/ratelimiter/base.py @@ -2,7 +2,7 @@ from typing import Any, Optional, TYPE_CHECKING, final from aiolimiter import AsyncLimiter -from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest # type: ignore +from google.cloud.cloudquotas_v1 import CloudQuotasAsyncClient, GetQuotaInfoRequest from google.api_core.exceptions import GoogleAPICallError from loguru import logger from enum import Enum @@ -116,6 +116,13 @@ class GCPResourceRateLimiter(GCPResourceQuota): time_period: float = _DEFAULT_RATE_LIMIT_TIME_PERIOD + async def default_rate_limiter(self) -> AsyncLimiter: + quota = int(max(round(self._default_quota * _PERCENTAGE_OF_QUOTA, 1), 1)) + logger.info( + f"Using default values: The Integration will utilize {_PERCENTAGE_OF_QUOTA * 100}% of the quota, which equates to {quota} for rate limiting." + ) + return AsyncLimiter(max_rate=quota, time_period=self.time_period) + @cache_coroutine_result() async def register(self, container_id: str, *arg: Optional[Any]) -> AsyncLimiter: quota = await self._get_quota(container_id, *arg) @@ -147,6 +154,12 @@ class ResourceBoundedSemaphore(GCPResourceRateLimiter): default_maximum_limit: int = MAXIMUM_CONCURRENT_REQUESTS + async def default_semaphore(self) -> asyncio.BoundedSemaphore: + logger.info( + f"The integration will process {self.default_maximum_limit} at a time based on the default maximum limit." + ) + return asyncio.BoundedSemaphore(self.default_maximum_limit) + @cache_coroutine_result() async def semaphore( self, container_id: str, *args: Any diff --git a/integrations/gcp/gcp_core/utils.py b/integrations/gcp/gcp_core/utils.py index c771cb6d33..c99af2aa57 100644 --- a/integrations/gcp/gcp_core/utils.py +++ b/integrations/gcp/gcp_core/utils.py @@ -1,9 +1,11 @@ import enum import base64 +import os import typing from collections.abc import MutableSequence from typing import Any, TypedDict, Tuple +from loguru import logger import proto # type: ignore from port_ocean.context.event import event from port_ocean.core.handlers.port_app_config.models import ResourceConfig @@ -11,7 +13,7 @@ from gcp_core.overrides import GCPCloudResourceConfig from port_ocean.context.ocean import ocean import json - +from pathlib import Path from gcp_core.helpers.ratelimiter.overrides import ( SearchAllResourcesQpmPerProject, PubSubAdministratorPerMinutePerProject, @@ -19,7 +21,11 @@ search_all_resources_qpm_per_project = SearchAllResourcesQpmPerProject() pubsub_administrator_per_minute_per_project = PubSubAdministratorPerMinutePerProject() + EXTRA_PROJECT_FIELD = "__project" +DEFAULT_CREDENTIALS_FILE_PATH = ( + f"{Path.home()}/.config/gcloud/application_default_credentials.json" +) if typing.TYPE_CHECKING: from aiolimiter import AsyncLimiter @@ -72,36 +78,84 @@ def get_current_resource_config() -> ( def get_credentials_json() -> str: - b64_credentials = ocean.integration_config["encoded_adc_configuration"] - credentials_json = base64.b64decode(b64_credentials).decode("utf-8") + credentials_json = "" + if ocean.integration_config.get("encoded_adc_configuration"): + b64_credentials = ocean.integration_config["encoded_adc_configuration"] + credentials_json = base64.b64decode(b64_credentials).decode("utf-8") + else: + try: + file_path: str = ( + os.environ.get("GOOGLE_APPLICATION_CREDENTIALS") + or DEFAULT_CREDENTIALS_FILE_PATH + ) + with open(file_path, "r", encoding="utf-8") as file: + credentials_json = file.read() + except FileNotFoundError as e: + raise FileNotFoundError( + f"Couldn't find the google credentials file. Please set the GOOGLE_APPLICATION_CREDENTIALS environment variable properly. Error: {str(e)}" + ) return credentials_json def get_service_account_project_id() -> str: "get project id associated with service account" - default_credentials = json.loads(get_credentials_json()) - project_id = default_credentials["project_id"] - return project_id + try: + default_credentials = json.loads(get_credentials_json()) + project_id = default_credentials["quota_project_id"] + return project_id + except FileNotFoundError as e: + gcp_project_env = os.getenv("GCP_PROJECT") + if isinstance(gcp_project_env, str): + return gcp_project_env + else: + raise ValueError( + f"Couldn't figure out the service account's project id. You can specify it usign the GCP_PROJECT environment variable. Error: {str(e)}" + ) + except KeyError as e: + raise ValueError( + f"Couldn't figure out the service account's project id. Key: {str(e)} doesn't exist in the credentials file." + ) + except Exception as e: + raise ValueError( + f"Couldn't figure out the service account's project id. Error: {str(e)}" + ) + raise ValueError("Couldn't figure out the service account's project id.") async def resolve_request_controllers( kind: str, ) -> Tuple["AsyncLimiter", "BoundedSemaphore"]: - service_account_project_id = get_service_account_project_id() - - if kind == AssetTypesWithSpecialHandling.TOPIC: - topic_rate_limiter = await pubsub_administrator_per_minute_per_project.limiter( + try: + service_account_project_id = get_service_account_project_id() + + if kind == AssetTypesWithSpecialHandling.TOPIC: + topic_rate_limiter = ( + await pubsub_administrator_per_minute_per_project.limiter( + service_account_project_id + ) + ) + topic_semaphore = ( + await pubsub_administrator_per_minute_per_project.semaphore( + service_account_project_id + ) + ) + return (topic_rate_limiter, topic_semaphore) + + asset_rate_limiter = await search_all_resources_qpm_per_project.limiter( service_account_project_id ) - topic_semaphore = await pubsub_administrator_per_minute_per_project.semaphore( + asset_semaphore = await search_all_resources_qpm_per_project.semaphore( service_account_project_id ) - return (topic_rate_limiter, topic_semaphore) - - asset_rate_limiter = await search_all_resources_qpm_per_project.limiter( - service_account_project_id - ) - asset_semaphore = await search_all_resources_qpm_per_project.semaphore( - service_account_project_id - ) - return (asset_rate_limiter, asset_semaphore) + return (asset_rate_limiter, asset_semaphore) + except Exception as e: + logger.warning( + f"Failed to compute quota dynamically due to error. Will use default values. Error: {str(e)}" + ) + default_rate_limiter = ( + await search_all_resources_qpm_per_project.default_rate_limiter() + ) + default_semaphore = ( + await search_all_resources_qpm_per_project.default_semaphore() + ) + return (default_rate_limiter, default_semaphore) diff --git a/integrations/gcp/poetry.lock b/integrations/gcp/poetry.lock index e0b69746da..c2001e119f 100644 --- a/integrations/gcp/poetry.lock +++ b/integrations/gcp/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. [[package]] name = "aiolimiter" @@ -613,6 +613,23 @@ protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4 [package.extras] libcst = ["libcst (>=0.3.10)"] +[[package]] +name = "google-cloud-quotas" +version = "0.1.10" +description = "Google Cloud Quotas API client library" +optional = false +python-versions = ">=3.7" +files = [ + {file = "google_cloud_quotas-0.1.10-py3-none-any.whl", hash = "sha256:9d39bf10c35580967ef4da08daf46390a7f48c7bcd0bfde8acaee6f6c599a6c5"}, + {file = "google_cloud_quotas-0.1.10.tar.gz", hash = "sha256:f56ed8a7a373e720381afc8780ab7ac065b5f11ef70a1cfa93f838c21b5cddfa"}, +] + +[package.dependencies] +google-api-core = {version = ">=1.34.1,<2.0.dev0 || >=2.11.dev0,<3.0.0dev", extras = ["grpc"]} +google-auth = ">=2.14.1,<2.24.0 || >2.24.0,<2.25.0 || >2.25.0,<3.0.0dev" +proto-plus = ">=1.22.3,<2.0.0dev" +protobuf = ">=3.20.2,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<6.0.0dev" + [[package]] name = "google-cloud-resource-manager" version = "1.12.5" @@ -1686,13 +1703,13 @@ files = [ [[package]] name = "setuptools" -version = "74.0.0" +version = "74.1.1" description = "Easily download, build, install, upgrade, and uninstall Python packages" optional = false python-versions = ">=3.8" files = [ - {file = "setuptools-74.0.0-py3-none-any.whl", hash = "sha256:0274581a0037b638b9fc1c6883cc71c0210865aaa76073f7882376b641b84e8f"}, - {file = "setuptools-74.0.0.tar.gz", hash = "sha256:a85e96b8be2b906f3e3e789adec6a9323abf79758ecfa3065bd740d81158b11e"}, + {file = "setuptools-74.1.1-py3-none-any.whl", hash = "sha256:fc91b5f89e392ef5b77fe143b17e32f65d3024744fba66dc3afe07201684d766"}, + {file = "setuptools-74.1.1.tar.gz", hash = "sha256:2353af060c06388be1cecbf5953dcdb1f38362f87a2356c480b6b4d5fcfc8847"}, ] [package.extras] @@ -2169,4 +2186,4 @@ dev = ["black (>=19.3b0)", "pytest (>=4.6.2)"] [metadata] lock-version = "2.0" python-versions = "^3.11" -content-hash = "39f1351334f3de1f696bc72b6833db03e061571c27a360ab8955cf8c9896d8fd" +content-hash = "60e934a2d720e23290d3d20a57b01a4d9407a5df4a4b9084d31c7ebd64784e09" diff --git a/integrations/gcp/pyproject.toml b/integrations/gcp/pyproject.toml index aaf321aea4..a4f8958f18 100644 --- a/integrations/gcp/pyproject.toml +++ b/integrations/gcp/pyproject.toml @@ -10,6 +10,7 @@ port_ocean = {version = "^0.10.7", extras = ["cli"]} google-cloud-asset = "^3.25.1" google-cloud-pubsub = "^2.21.1" google-cloud-resource-manager = "^1.12.3" +google-cloud-quotas = "^0.1.10" aiolimiter = "^1.1.0" types-requests = "^2.32.0.20240712" @@ -115,4 +116,4 @@ exclude = ''' [tool.pytest.ini_options] asyncio_mode = "auto" asyncio_default_fixture_loop_scope = "function" -addopts = "-vv -n auto ./tests" \ No newline at end of file +addopts = "-vv -n auto ./tests"