From c318294a249faeaea8e1fdf687cd03e7b16a852b Mon Sep 17 00:00:00 2001 From: mkarmah Date: Wed, 23 Oct 2024 10:33:06 +0000 Subject: [PATCH 01/23] support to query specific regions --- integrations/aws/aws/aws_credentials.py | 4 +- integrations/aws/main.py | 56 +++++++++++++++++++++---- integrations/aws/utils/aws.py | 20 ++++++--- integrations/aws/utils/overrides.py | 1 + 4 files changed, 66 insertions(+), 15 deletions(-) diff --git a/integrations/aws/aws/aws_credentials.py b/integrations/aws/aws/aws_credentials.py index fa30d9d0f5..f7cf744749 100644 --- a/integrations/aws/aws/aws_credentials.py +++ b/integrations/aws/aws/aws_credentials.py @@ -49,7 +49,9 @@ async def create_session(self, region: Optional[str] = None) -> aioboto3.Session ) async def create_session_for_each_region( - self, + self, regions_to_query: Optional[list[str]] = None ) -> AsyncIterator[aioboto3.Session]: for region in self.enabled_regions: + if regions_to_query and region not in regions_to_query: + continue yield await self.create_session(region) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 065dd902af..d34557a675 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -1,5 +1,6 @@ import json import typing +from typing import Optional from fastapi import Response, status import fastapi @@ -26,6 +27,9 @@ validate_request, ) from port_ocean.context.ocean import ocean +from port_ocean.context.event import event +from utils.overrides import AWSResourceConfig + from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from utils.misc import ( @@ -72,7 +76,7 @@ async def _handle_global_resource_resync( async def resync_resources_for_account( - credentials: AwsCredentials, kind: str + credentials: AwsCredentials, kind: str, regions_to_query: Optional[list[str]] = None ) -> ASYNC_GENERATOR_RESYNC_TYPE: """Function to handle fetching resources for a single account.""" errors, regions = [], [] @@ -81,11 +85,16 @@ async def resync_resources_for_account( async for batch in _handle_global_resource_resync(kind, credentials): yield batch else: - async for session in credentials.create_session_for_each_region(): + async for session in credentials.create_session_for_each_region( + regions_to_query=regions_to_query + ): try: async for batch in resync_cloudcontrol(kind, session): yield batch except Exception as exc: + logger.error( + f"Failed to fetch {kind} for {session.region_name} in {credentials.account_id}: {exc}" + ) regions.append(session.region_name) errors.append(exc) continue @@ -100,10 +109,19 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: return await update_available_access_credentials() + + regions_to_query = typing.cast( + AWSResourceConfig, event.resource_config + ).selector.regions + + logger.warning(f"REGIONS TO QUERY {regions_to_query}") + tasks = [ semaphore_async_iterator( semaphore, - functools.partial(resync_resources_for_account, credentials, kind), + functools.partial( + resync_resources_for_account, credentials, kind, regions_to_query + ), ) async for credentials in get_accounts() ] @@ -124,6 +142,10 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + regions_to_query = typing.cast( + AWSResourceConfig, event.resource_config + ).selector.regions + tasks = [ semaphore_async_iterator( semaphore, @@ -137,7 +159,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "Marker", ), ) - async for session in get_sessions() + async for session in get_sessions(regions_to_query=regions_to_query) ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): @@ -149,6 +171,10 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + regions_to_query = typing.cast( + AWSResourceConfig, event.resource_config + ).selector.regions + tasks = [ semaphore_async_iterator( semaphore, @@ -162,7 +188,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "Marker", ), ) - async for session in get_sessions() + async for session in get_sessions(regions_to_query=regions_to_query) ] if tasks: @@ -175,6 +201,10 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + regions_to_query = typing.cast( + AWSResourceConfig, event.resource_config + ).selector.regions + tasks = [ semaphore_async_iterator( semaphore, @@ -188,7 +218,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "NextToken", ), ) - async for session in get_sessions() + async for session in get_sessions(regions_to_query=regions_to_query) ] if tasks: @@ -200,6 +230,11 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE) async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + + regions_to_query = typing.cast( + AWSResourceConfig, event.resource_config + ).selector.regions + tasks = [ semaphore_async_iterator( semaphore, @@ -214,7 +249,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: {"Owners": ["self"]}, ), ) - async for session in get_sessions() + async for session in get_sessions(regions_to_query=regions_to_query) ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): @@ -225,6 +260,11 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK) async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + + regions_to_query = typing.cast( + AWSResourceConfig, event.resource_config + ).selector.regions + tasks = [ semaphore_async_iterator( semaphore, @@ -238,7 +278,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "NextToken", ), ) - async for session in get_sessions() + async for session in get_sessions(regions_to_query=regions_to_query) ] if tasks: diff --git a/integrations/aws/utils/aws.py b/integrations/aws/utils/aws.py index 4de0a93a2b..014e541d34 100644 --- a/integrations/aws/utils/aws.py +++ b/integrations/aws/utils/aws.py @@ -1,4 +1,5 @@ from typing import Any, AsyncIterator, Optional, Union +import typing import aioboto3 from port_ocean.context.ocean import ocean @@ -58,15 +59,19 @@ async def session_factory( credentials: AwsCredentials, custom_region: Optional[str], use_default_region: Optional[bool], + regions_to_query: Optional[list[str]], ) -> AsyncIterator[aioboto3.Session]: - - if use_default_region: + if use_default_region and ( + not regions_to_query or default_region in regions_to_query + ): default_region = get_default_region_from_credentials(credentials) yield await credentials.create_session(default_region) - elif custom_region: + elif custom_region and (not regions_to_query or custom_region in regions_to_query): yield await credentials.create_session(custom_region) else: - async for session in credentials.create_session_for_each_region(): + async for session in credentials.create_session_for_each_region( + regions_to_query + ): yield session @@ -74,6 +79,7 @@ async def get_sessions( custom_account_id: Optional[str] = None, custom_region: Optional[str] = None, use_default_region: Optional[bool] = None, + regions_to_query: Optional[list[str]] = None, ) -> AsyncIterator[aioboto3.Session]: """ Gets boto3 sessions for the AWS regions. @@ -83,12 +89,14 @@ async def get_sessions( if custom_account_id: credentials = _session_manager.find_credentials_by_account_id(custom_account_id) async for session in session_factory( - credentials, custom_region, use_default_region + credentials, custom_region, use_default_region, regions_to_query ): yield session else: tasks = [ - session_factory(credentials, custom_region, use_default_region) + session_factory( + credentials, custom_region, use_default_region, regions_to_query + ) async for credentials in get_accounts() ] if tasks: diff --git a/integrations/aws/utils/overrides.py b/integrations/aws/utils/overrides.py index c6d5448dc2..3118e88366 100644 --- a/integrations/aws/utils/overrides.py +++ b/integrations/aws/utils/overrides.py @@ -8,6 +8,7 @@ class AWSDescribeResourcesSelector(Selector): use_get_resource_api: bool = Field(alias="useGetResourceAPI", default=False) + regions: list[str] = Field(alias="regions", default_factory=list) class AWSResourceConfig(ResourceConfig): From 4318eaf0b3ad506ae413584fe98d6f13ff1037f8 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 24 Oct 2024 17:27:33 +0000 Subject: [PATCH 02/23] region policy for seemless control --- integrations/aws/.port/spec.yaml | 5 +++ integrations/aws/aws/aws_credentials.py | 4 +- integrations/aws/main.py | 53 ++++--------------------- integrations/aws/utils/aws.py | 20 +++------- integrations/aws/utils/misc.py | 4 +- integrations/aws/utils/overrides.py | 40 ++++++++++++++++++- integrations/aws/utils/resources.py | 11 ++++- 7 files changed, 70 insertions(+), 67 deletions(-) diff --git a/integrations/aws/.port/spec.yaml b/integrations/aws/.port/spec.yaml index 33b2b30eb8..74c2238e7b 100644 --- a/integrations/aws/.port/spec.yaml +++ b/integrations/aws/.port/spec.yaml @@ -37,6 +37,11 @@ configurations: type: string sensitive: true description: AWS API Key for custom events, used to validate the event source for real-time event updates. + - name: maximumConcurrentAccounts + type: integer + require: false + description: The number of concurrent accounts to scan. By default, it is set to 50. + default: 50 deploymentMethodRequirements: - type: default configurations: ['awsAccessKeyId', 'awsSecretAccessKey'] diff --git a/integrations/aws/aws/aws_credentials.py b/integrations/aws/aws/aws_credentials.py index f7cf744749..fa30d9d0f5 100644 --- a/integrations/aws/aws/aws_credentials.py +++ b/integrations/aws/aws/aws_credentials.py @@ -49,9 +49,7 @@ async def create_session(self, region: Optional[str] = None) -> aioboto3.Session ) async def create_session_for_each_region( - self, regions_to_query: Optional[list[str]] = None + self, ) -> AsyncIterator[aioboto3.Session]: for region in self.enabled_regions: - if regions_to_query and region not in regions_to_query: - continue yield await self.create_session(region) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index d34557a675..5f956103ab 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -1,6 +1,5 @@ import json import typing -from typing import Optional from fastapi import Response, status import fastapi @@ -27,9 +26,6 @@ validate_request, ) from port_ocean.context.ocean import ocean -from port_ocean.context.event import event -from utils.overrides import AWSResourceConfig - from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from utils.misc import ( @@ -76,7 +72,7 @@ async def _handle_global_resource_resync( async def resync_resources_for_account( - credentials: AwsCredentials, kind: str, regions_to_query: Optional[list[str]] = None + credentials: AwsCredentials, kind: str ) -> ASYNC_GENERATOR_RESYNC_TYPE: """Function to handle fetching resources for a single account.""" errors, regions = [], [] @@ -85,9 +81,7 @@ async def resync_resources_for_account( async for batch in _handle_global_resource_resync(kind, credentials): yield batch else: - async for session in credentials.create_session_for_each_region( - regions_to_query=regions_to_query - ): + async for session in credentials.create_session_for_each_region(): try: async for batch in resync_cloudcontrol(kind, session): yield batch @@ -109,19 +103,10 @@ async def resync_all(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: return await update_available_access_credentials() - - regions_to_query = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.regions - - logger.warning(f"REGIONS TO QUERY {regions_to_query}") - tasks = [ semaphore_async_iterator( semaphore, - functools.partial( - resync_resources_for_account, credentials, kind, regions_to_query - ), + functools.partial(resync_resources_for_account, credentials, kind), ) async for credentials in get_accounts() ] @@ -142,10 +127,6 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() - regions_to_query = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.regions - tasks = [ semaphore_async_iterator( semaphore, @@ -159,7 +140,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "Marker", ), ) - async for session in get_sessions(regions_to_query=regions_to_query) + async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): @@ -171,10 +152,6 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() - regions_to_query = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.regions - tasks = [ semaphore_async_iterator( semaphore, @@ -188,7 +165,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "Marker", ), ) - async for session in get_sessions(regions_to_query=regions_to_query) + async for session in get_sessions() ] if tasks: @@ -201,10 +178,6 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() - regions_to_query = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.regions - tasks = [ semaphore_async_iterator( semaphore, @@ -218,7 +191,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "NextToken", ), ) - async for session in get_sessions(regions_to_query=regions_to_query) + async for session in get_sessions() ] if tasks: @@ -230,11 +203,6 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE) async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() - - regions_to_query = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.regions - tasks = [ semaphore_async_iterator( semaphore, @@ -249,7 +217,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: {"Owners": ["self"]}, ), ) - async for session in get_sessions(regions_to_query=regions_to_query) + async for session in get_sessions() ] if tasks: async for batch in stream_async_iterators_tasks(*tasks): @@ -260,11 +228,6 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK) async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() - - regions_to_query = typing.cast( - AWSResourceConfig, event.resource_config - ).selector.regions - tasks = [ semaphore_async_iterator( semaphore, @@ -278,7 +241,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "NextToken", ), ) - async for session in get_sessions(regions_to_query=regions_to_query) + async for session in get_sessions() ] if tasks: diff --git a/integrations/aws/utils/aws.py b/integrations/aws/utils/aws.py index 014e541d34..4de0a93a2b 100644 --- a/integrations/aws/utils/aws.py +++ b/integrations/aws/utils/aws.py @@ -1,5 +1,4 @@ from typing import Any, AsyncIterator, Optional, Union -import typing import aioboto3 from port_ocean.context.ocean import ocean @@ -59,19 +58,15 @@ async def session_factory( credentials: AwsCredentials, custom_region: Optional[str], use_default_region: Optional[bool], - regions_to_query: Optional[list[str]], ) -> AsyncIterator[aioboto3.Session]: - if use_default_region and ( - not regions_to_query or default_region in regions_to_query - ): + + if use_default_region: default_region = get_default_region_from_credentials(credentials) yield await credentials.create_session(default_region) - elif custom_region and (not regions_to_query or custom_region in regions_to_query): + elif custom_region: yield await credentials.create_session(custom_region) else: - async for session in credentials.create_session_for_each_region( - regions_to_query - ): + async for session in credentials.create_session_for_each_region(): yield session @@ -79,7 +74,6 @@ async def get_sessions( custom_account_id: Optional[str] = None, custom_region: Optional[str] = None, use_default_region: Optional[bool] = None, - regions_to_query: Optional[list[str]] = None, ) -> AsyncIterator[aioboto3.Session]: """ Gets boto3 sessions for the AWS regions. @@ -89,14 +83,12 @@ async def get_sessions( if custom_account_id: credentials = _session_manager.find_credentials_by_account_id(custom_account_id) async for session in session_factory( - credentials, custom_region, use_default_region, regions_to_query + credentials, custom_region, use_default_region ): yield session else: tasks = [ - session_factory( - credentials, custom_region, use_default_region, regions_to_query - ) + session_factory(credentials, custom_region, use_default_region) async for credentials in get_accounts() ] if tasks: diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index ca416373f5..1230918aac 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -1,10 +1,12 @@ import enum from port_ocean.context.event import event +from port_ocean.context.ocean import ocean import asyncio -MAX_CONCURRENT_TASKS = 50 +MAX_CONCURRENT_TASKS: int = int(ocean.integration_config["maximum_concurrent_accounts"]) + semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENT_TASKS) diff --git a/integrations/aws/utils/overrides.py b/integrations/aws/utils/overrides.py index 3118e88366..37b1e580a7 100644 --- a/integrations/aws/utils/overrides.py +++ b/integrations/aws/utils/overrides.py @@ -3,12 +3,48 @@ PortAppConfig, Selector, ) -from pydantic import Field +from pydantic import Field, BaseModel +from typing import List + + +class RegionPolicy(BaseModel): + allow: List[str] = Field(default_factory=list) + deny: List[str] = Field(default_factory=list) class AWSDescribeResourcesSelector(Selector): use_get_resource_api: bool = Field(alias="useGetResourceAPI", default=False) - regions: list[str] = Field(alias="regions", default_factory=list) + region_policy: RegionPolicy = Field( + alias="regionPolicy", default_factory=RegionPolicy + ) + + def is_region_allowed(self, region: str) -> bool: + """ + Determines if a given region is allowed based on the query regions policy. + This method checks the `region_policy` attribute to decide if the specified + region should be allowed or denied. The policy can contain "allow" and "deny" lists + which dictate the behavior. + + Scenarios: + - If `region_policy` is not set or empty, the method returns True, allowing all regions. + - If the region is listed in the "deny" list of `region_policy`, the method returns False. + - If the region is listed in the "allow" list of `region_policy`, the method returns True. + - If the region is not listed in either "allow" or "deny" lists, the method returns False. + + Args: + region (str): The region to be checked. + + Returns: + bool: True if the region is allowed, False otherwise. + """ + if not self.region_policy.allow and not self.region_policy.deny: + return True + if region in self.region_policy.deny: + return False + if region in self.region_policy.allow: + return True + + return False class AWSResourceConfig(ResourceConfig): diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index d8f408f314..ffe955581e 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -165,11 +165,18 @@ async def resync_custom_kind( async def resync_cloudcontrol( kind: str, session: aioboto3.Session ) -> ASYNC_GENERATOR_RESYNC_TYPE: - use_get_resource_api = typing.cast( + resource_config_selector = typing.cast( AWSResourceConfig, event.resource_config - ).selector.use_get_resource_api + ).selector + use_get_resource_api = resource_config_selector.use_get_resource_api + region = session.region_name account_id = await _session_manager.find_account_id_by_session(session) + if not resource_config_selector.is_region_allowed(region): + logger.info( + f"Skipping resyncing {kind} in region {region} in account {account_id} because it's not allowed" + ) + return logger.info(f"Resyncing {kind} in account {account_id} in region {region}") next_token = None while True: From 5943bd98dd4e27623d87a03f351e2bbc6be57129 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 24 Oct 2024 18:52:44 +0000 Subject: [PATCH 03/23] bump integration version --- integrations/aws/CHANGELOG.md | 8 ++++++++ integrations/aws/main.py | 4 +++- integrations/aws/pyproject.toml | 2 +- integrations/aws/utils/misc.py | 9 ++++++--- 4 files changed, 18 insertions(+), 5 deletions(-) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index d8031db651..76154eb077 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 +## 0.2.52 (2024-10-23) + + +### Improvements + +- Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping. +- Introduced `maximumConcurrentAccount` environment variable to control the maximum number of accounts synced concurrently. + ## 0.2.51 (2024-10-23) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 5f956103ab..5b9aadf198 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -34,7 +34,7 @@ ResourceKindsWithSpecialHandling, is_access_denied_exception, is_server_error, - semaphore, + get_semaphore, ) from port_ocean.utils.async_iterators import ( stream_async_iterators_tasks, @@ -42,6 +42,8 @@ ) import functools +semaphore = get_semaphore() + async def _handle_global_resource_resync( kind: str, diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index 6f11faa588..ce8f9aef1c 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.51" +version = "0.2.52" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index 1230918aac..f52fc4fee4 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -5,9 +5,12 @@ import asyncio -MAX_CONCURRENT_TASKS: int = int(ocean.integration_config["maximum_concurrent_accounts"]) - -semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENT_TASKS) +def get_semaphore() -> asyncio.BoundedSemaphore: + max_concurrent_accounts: int = int( + ocean.integration_config["maximum_concurrent_accounts"] + ) + semaphore = asyncio.BoundedSemaphore(max_concurrent_accounts) + return semaphore class CustomProperties(enum.StrEnum): From 443a314d5386de7d89fccf317dacab29d1867203 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 24 Oct 2024 18:53:24 +0000 Subject: [PATCH 04/23] correct spacing in changelog --- integrations/aws/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index 76154eb077..bce487fe7a 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping. - Introduced `maximumConcurrentAccount` environment variable to control the maximum number of accounts synced concurrently. + ## 0.2.51 (2024-10-23) From 932ae04d0022d7299c3fc00489becbcb66c11083 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 24 Oct 2024 21:26:55 +0000 Subject: [PATCH 05/23] query regions optionally in resources with special handling --- .../aws/tests/utils/test_overrides.py | 34 +++++++++++++++++++ integrations/aws/utils/resources.py | 11 ++++++ 2 files changed, 45 insertions(+) create mode 100644 integrations/aws/tests/utils/test_overrides.py diff --git a/integrations/aws/tests/utils/test_overrides.py b/integrations/aws/tests/utils/test_overrides.py new file mode 100644 index 0000000000..2b9a14dd59 --- /dev/null +++ b/integrations/aws/tests/utils/test_overrides.py @@ -0,0 +1,34 @@ +import unittest +from utils.overrides import AWSDescribeResourcesSelector, RegionPolicy + + +class TestAWSDescribeResourcesSelector(unittest.TestCase): + + def test_is_region_allowed_no_policy(self) -> None: + selector = AWSDescribeResourcesSelector(query="test") + self.assertTrue(selector.is_region_allowed("us-east-1")) + + def test_is_region_allowed_deny_policy(self) -> None: + region_policy = RegionPolicy(deny=["us-east-1"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + self.assertFalse(selector.is_region_allowed("us-west-2")) + + def test_is_region_allowed_allow_policy(self) -> None: + region_policy = RegionPolicy(allow=["us-west-2"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertTrue(selector.is_region_allowed("us-west-2")) + self.assertFalse(selector.is_region_allowed("us-east-1")) + + def test_is_region_allowed_both_policies(self) -> None: + region_policy = RegionPolicy(allow=["us-west-2"], deny=["us-east-1"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + self.assertTrue(selector.is_region_allowed("us-west-2")) + self.assertFalse(selector.is_region_allowed("eu-central-1")) diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index ffe955581e..d305c2f308 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -125,6 +125,17 @@ async def resync_custom_kind( region = session.region_name account_id = await _session_manager.find_account_id_by_session(session) next_token = None + + resource_config_selector = typing.cast( + AWSResourceConfig, event.resource_config + ).selector + + if not resource_config_selector.is_region_allowed(region): + logger.info( + f"Skipping resyncing {kind} in region {region} in account {account_id} because it's not allowed" + ) + return + if not describe_method_params: describe_method_params = {} while True: From 6928f778da205f9958957b6203e16d555bc792cb Mon Sep 17 00:00:00 2001 From: mkarmah Date: Fri, 25 Oct 2024 12:49:11 +0000 Subject: [PATCH 06/23] updated version --- integrations/aws/pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index ce8f9aef1c..bc937f0d70 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.52" +version = "0.2.52-rc1" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] From 9f8bb7172f541fd0e70ea437689ee5dc7cf7d46b Mon Sep 17 00:00:00 2001 From: Michael Kofi Armah Date: Sat, 26 Oct 2024 07:29:20 +0000 Subject: [PATCH 07/23] Update integrations/aws/CHANGELOG.md Co-authored-by: Tom Tankilevitch <59158507+Tankilevitch@users.noreply.github.com> --- integrations/aws/CHANGELOG.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index bce487fe7a..0cb03b7d40 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -13,7 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Improvements - Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping. -- Introduced `maximumConcurrentAccount` environment variable to control the maximum number of accounts synced concurrently. +- Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently. ## 0.2.51 (2024-10-23) From 627bb9ec8e157189c29cc52a31f10720904e9c00 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sat, 26 Oct 2024 17:02:00 +0000 Subject: [PATCH 08/23] real time event supports region policy --- integrations/aws/main.py | 136 +++++++++++++++------------------ integrations/aws/utils/misc.py | 21 +++-- 2 files changed, 77 insertions(+), 80 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 5b9aadf198..da11cdf4b1 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -44,7 +44,6 @@ semaphore = get_semaphore() - async def _handle_global_resource_resync( kind: str, credentials: AwsCredentials, @@ -288,81 +287,72 @@ class ResourceUpdate(BaseModel): @ocean.router.post("/webhook") async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Response: await update_available_access_credentials() - try: - logger.info(f"Received AWS Webhook request body: {update}") - resource_type = update.resource_type - identifier = update.identifier - account_id = update.accountId - region = update.awsRegion - - with logger.contextualize( - account_id=account_id, resource_type=resource_type, identifier=identifier - ): - matching_resource_configs = get_matching_kinds_and_blueprints_from_config( - resource_type - ) + logger.info(f"Received AWS Webhook request body: {update}") + resource_type = update.resource_type + identifier = update.identifier + account_id = update.accountId + region = update.awsRegion + + with logger.contextualize( + account_id=account_id, resource_type=resource_type, identifier=identifier + ): + allowed_configs, disallowed_configs = get_matching_kinds_and_blueprints_from_config( + resource_type, region + ) - logger.debug( - "Querying full resource on AWS before registering change in port" + if disallowed_configs: + logger.info( + f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed" + ) + await ocean.unregister( + [Entity(blueprint=blueprint, identifier=identifier) for blueprints in disallowed_configs.values() for blueprint in blueprints] ) + + if not allowed_configs: + logger.info( + f"{resource_type} not found or disabled for region {region} in account {account_id}" + ) + return fastapi.Response(status_code=status.HTTP_200_OK) - try: - resource = await describe_single_resource( - resource_type, identifier, account_id, region - ) - except Exception as e: - if is_access_denied_exception(e): - logger.error( - f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}" - ) - return fastapi.Response( - status_code=status.HTTP_200_OK, - ) - if is_server_error(e): - logger.error( - f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}" - ) - return fastapi.Response( - status_code=status.HTTP_200_OK, - ) - resource = None - - for kind in matching_resource_configs: - blueprints = matching_resource_configs[kind] - if not resource: # Resource probably deleted - for blueprint in blueprints: - logger.info( - "Resource not found in AWS, un-registering from port" - ) - await ocean.unregister( - [ - Entity( - blueprint=blueprint, - identifier=identifier, - ) - ] - ) - else: # Resource found in AWS, update port - logger.info("Resource found in AWS, registering change in port") - resource.update( - { - CustomProperties.KIND: resource_type, - CustomProperties.ACCOUNT_ID: account_id, - CustomProperties.REGION: region, - } - ) - await ocean.register_raw( - kind, - [fix_unserializable_date_properties(resource)], - ) - - logger.info("Webhook processed successfully") - return fastapi.Response( - status_code=status.HTTP_200_OK, content=json.dumps({"ok": True}) + logger.debug("Querying full resource on AWS before registering change in port") + resource = None + try: + resource = await describe_single_resource( + resource_type, identifier, account_id, region ) - except Exception as e: - logger.exception("Failed to process event from aws") + except Exception as e: + if is_access_denied_exception(e): + logger.error( + f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}" + ) + elif is_server_error(e): + logger.error( + f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}" + ) + else: + logger.exception("Failed to describe resource") + return fastapi.Response(status_code=status.HTTP_200_OK) + + for kind, blueprints in allowed_configs.items(): + if not resource: # Resource probably deleted + logger.info("Resource not found in AWS, un-registering from port") + await ocean.unregister( + [Entity(blueprint=blueprint, identifier=identifier) for blueprint in blueprints] + ) + else: # Resource found in AWS, update port + logger.info("Resource found in AWS, registering change in port") + resource.update( + { + CustomProperties.KIND: resource_type, + CustomProperties.ACCOUNT_ID: account_id, + CustomProperties.REGION: region, + } + ) + await ocean.register_raw( + kind, [fix_unserializable_date_properties(resource)] + ) + + logger.info("Webhook processed successfully") return fastapi.Response( - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - content=json.dumps({"ok": False, "error": str(e)}), + status_code=status.HTTP_200_OK, content=json.dumps({"ok": True}) ) diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index f52fc4fee4..68a63123a4 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -51,16 +51,23 @@ def is_server_error(e: Exception) -> bool: def get_matching_kinds_and_blueprints_from_config( - kind: str, -) -> dict[str, list[str]]: - kinds: dict[str, list[str]] = {} + kind: str, region: str +) -> tuple[dict[str, list[str]], dict[str, list[str]]]: + allowed_kinds: dict[str, list[str]] = {} + disallowed_kinds: dict[str, list[str]] = {} resources = event.port_app_config.resources for resource in resources: blueprint = resource.port.entity.mappings.blueprint.strip('"') - if resource.kind in kinds: - kinds[resource.kind].append(blueprint) + if not resource.selector.is_region_allowed(region) and kind == resource.kind: + if kind in disallowed_kinds: + disallowed_kinds[kind].append(blueprint) + else: + disallowed_kinds[kind] = [blueprint] elif kind == resource.kind: - kinds[resource.kind] = [blueprint] + if kind in allowed_kinds: + allowed_kinds[kind].append(blueprint) + else: + allowed_kinds[kind] = [blueprint] - return kinds + return allowed_kinds, disallowed_kinds From c15fb333d15c984d82f21abb180205053657ab1f Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sat, 26 Oct 2024 17:05:11 +0000 Subject: [PATCH 09/23] lint --- integrations/aws/main.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index da11cdf4b1..5741098c55 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -44,6 +44,7 @@ semaphore = get_semaphore() + async def _handle_global_resource_resync( kind: str, credentials: AwsCredentials, @@ -296,8 +297,8 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons with logger.contextualize( account_id=account_id, resource_type=resource_type, identifier=identifier ): - allowed_configs, disallowed_configs = get_matching_kinds_and_blueprints_from_config( - resource_type, region + allowed_configs, disallowed_configs = ( + get_matching_kinds_and_blueprints_from_config(resource_type, region) ) if disallowed_configs: @@ -305,9 +306,13 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed" ) await ocean.unregister( - [Entity(blueprint=blueprint, identifier=identifier) for blueprints in disallowed_configs.values() for blueprint in blueprints] + [ + Entity(blueprint=blueprint, identifier=identifier) + for blueprints in disallowed_configs.values() + for blueprint in blueprints + ] ) - + if not allowed_configs: logger.info( f"{resource_type} not found or disabled for region {region} in account {account_id}" @@ -337,7 +342,10 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons if not resource: # Resource probably deleted logger.info("Resource not found in AWS, un-registering from port") await ocean.unregister( - [Entity(blueprint=blueprint, identifier=identifier) for blueprint in blueprints] + [ + Entity(blueprint=blueprint, identifier=identifier) + for blueprint in blueprints + ] ) else: # Resource found in AWS, update port logger.info("Resource found in AWS, registering change in port") From 3747e1901476126efcc1c3f25230abc695a58b05 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sat, 26 Oct 2024 17:58:50 +0000 Subject: [PATCH 10/23] cast selector type --- integrations/aws/utils/misc.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index 68a63123a4..c4e06885f7 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -2,6 +2,8 @@ from port_ocean.context.event import event from port_ocean.context.ocean import ocean +from utils.overrides import AWSDescribeResourcesSelector +import typing import asyncio @@ -56,10 +58,10 @@ def get_matching_kinds_and_blueprints_from_config( allowed_kinds: dict[str, list[str]] = {} disallowed_kinds: dict[str, list[str]] = {} resources = event.port_app_config.resources - for resource in resources: blueprint = resource.port.entity.mappings.blueprint.strip('"') - if not resource.selector.is_region_allowed(region) and kind == resource.kind: + resource_selector = typing.cast(AWSDescribeResourcesSelector, resource.selector) + if not resource_selector.is_region_allowed(region) and kind == resource.kind: if kind in disallowed_kinds: disallowed_kinds[kind].append(blueprint) else: From b337b2101768d10915514416c0049178edefec9b Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sat, 26 Oct 2024 18:19:26 +0000 Subject: [PATCH 11/23] minimal update to webhook event logic --- integrations/aws/main.py | 137 +++++++++++++++++++++------------------ 1 file changed, 75 insertions(+), 62 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 5741098c55..3106964df0 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -288,79 +288,92 @@ class ResourceUpdate(BaseModel): @ocean.router.post("/webhook") async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Response: await update_available_access_credentials() - logger.info(f"Received AWS Webhook request body: {update}") - resource_type = update.resource_type - identifier = update.identifier - account_id = update.accountId - region = update.awsRegion - - with logger.contextualize( - account_id=account_id, resource_type=resource_type, identifier=identifier - ): - allowed_configs, disallowed_configs = ( - get_matching_kinds_and_blueprints_from_config(resource_type, region) - ) - - if disallowed_configs: - logger.info( - f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed" - ) - await ocean.unregister( - [ - Entity(blueprint=blueprint, identifier=identifier) - for blueprints in disallowed_configs.values() - for blueprint in blueprints - ] - ) - - if not allowed_configs: - logger.info( - f"{resource_type} not found or disabled for region {region} in account {account_id}" + try: + logger.info(f"Received AWS Webhook request body: {update}") + resource_type = update.resource_type + identifier = update.identifier + account_id = update.accountId + region = update.awsRegion + + with logger.contextualize( + account_id=account_id, resource_type=resource_type, identifier=identifier + ): + allowed_configs, disallowed_configs = ( + get_matching_kinds_and_blueprints_from_config(resource_type, region) ) - return fastapi.Response(status_code=status.HTTP_200_OK) - logger.debug("Querying full resource on AWS before registering change in port") - resource = None - try: - resource = await describe_single_resource( - resource_type, identifier, account_id, region - ) - except Exception as e: - if is_access_denied_exception(e): - logger.error( - f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}" - ) - elif is_server_error(e): - logger.error( - f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}" + if disallowed_configs: + logger.info( + f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed" ) - else: - logger.exception("Failed to describe resource") - return fastapi.Response(status_code=status.HTTP_200_OK) - - for kind, blueprints in allowed_configs.items(): - if not resource: # Resource probably deleted - logger.info("Resource not found in AWS, un-registering from port") await ocean.unregister( [ Entity(blueprint=blueprint, identifier=identifier) + for blueprints in disallowed_configs.values() for blueprint in blueprints ] ) - else: # Resource found in AWS, update port - logger.info("Resource found in AWS, registering change in port") - resource.update( - { - CustomProperties.KIND: resource_type, - CustomProperties.ACCOUNT_ID: account_id, - CustomProperties.REGION: region, - } + + if not allowed_configs: + logger.info( + f"{resource_type} not found or disabled for region {region} in account {account_id}" ) - await ocean.register_raw( - kind, [fix_unserializable_date_properties(resource)] + return fastapi.Response(status_code=status.HTTP_200_OK) + + logger.debug( + "Querying full resource on AWS before registering change in port" + ) + + try: + resource = await describe_single_resource( + resource_type, identifier, account_id, region ) + except Exception as e: + if is_access_denied_exception(e): + logger.error( + f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}" + ) + return fastapi.Response(status_code=status.HTTP_200_OK) + if is_server_error(e): + logger.error( + f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}" + ) + return fastapi.Response(status_code=status.HTTP_200_OK) + else: + logger.exception("Failed to describe resource") + + resource = None + + for kind, blueprints in allowed_configs.items(): + if not resource: # Resource probably deleted + logger.info("Resource not found in AWS, un-registering from port") + await ocean.unregister( + [ + Entity(blueprint=blueprint, identifier=identifier) + for blueprint in blueprints + ] + ) + else: # Resource found in AWS, update port + logger.info("Resource found in AWS, registering change in port") + resource.update( + { + CustomProperties.KIND: resource_type, + CustomProperties.ACCOUNT_ID: account_id, + CustomProperties.REGION: region, + } + ) + await ocean.register_raw( + kind, [fix_unserializable_date_properties(resource)] + ) + + logger.info("Webhook processed successfully") + return fastapi.Response( + status_code=status.HTTP_200_OK, content=json.dumps({"ok": True}) + ) - logger.info("Webhook processed successfully") + except Exception as e: + logger.exception("Failed to process event from aws") return fastapi.Response( - status_code=status.HTTP_200_OK, content=json.dumps({"ok": True}) + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + content=json.dumps({"ok": False, "error": str(e)}), ) From e8fea8f8565125b9caa57b9dcad55f7f1e68d12d Mon Sep 17 00:00:00 2001 From: mkarmah Date: Sat, 26 Oct 2024 19:05:53 +0000 Subject: [PATCH 12/23] updated repolicy logic --- .../aws/tests/utils/test_overrides.py | 27 ++++++++++++++++++- integrations/aws/utils/overrides.py | 9 +++++-- 2 files changed, 33 insertions(+), 3 deletions(-) diff --git a/integrations/aws/tests/utils/test_overrides.py b/integrations/aws/tests/utils/test_overrides.py index 2b9a14dd59..ce3f7e0aac 100644 --- a/integrations/aws/tests/utils/test_overrides.py +++ b/integrations/aws/tests/utils/test_overrides.py @@ -14,7 +14,7 @@ def test_is_region_allowed_deny_policy(self) -> None: query="test", regionPolicy=region_policy ) self.assertFalse(selector.is_region_allowed("us-east-1")) - self.assertFalse(selector.is_region_allowed("us-west-2")) + self.assertTrue(selector.is_region_allowed("us-west-2")) def test_is_region_allowed_allow_policy(self) -> None: region_policy = RegionPolicy(allow=["us-west-2"]) @@ -32,3 +32,28 @@ def test_is_region_allowed_both_policies(self) -> None: self.assertFalse(selector.is_region_allowed("us-east-1")) self.assertTrue(selector.is_region_allowed("us-west-2")) self.assertFalse(selector.is_region_allowed("eu-central-1")) + + def test_is_region_allowed_conflicting_policies(self) -> None: + region_policy = RegionPolicy(allow=["us-east-1"], deny=["us-east-1"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + + def test_is_region_allowed_deny_only(self) -> None: + region_policy = RegionPolicy(deny=["us-east-1", "us-west-2"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertFalse(selector.is_region_allowed("us-east-1")) + self.assertFalse(selector.is_region_allowed("us-west-2")) + self.assertTrue(selector.is_region_allowed("eu-central-1")) + + def test_is_region_allowed_allow_only(self) -> None: + region_policy = RegionPolicy(allow=["us-east-1", "us-west-2"]) + selector = AWSDescribeResourcesSelector( + query="test", regionPolicy=region_policy + ) + self.assertTrue(selector.is_region_allowed("us-east-1")) + self.assertTrue(selector.is_region_allowed("us-west-2")) + self.assertFalse(selector.is_region_allowed("eu-central-1")) diff --git a/integrations/aws/utils/overrides.py b/integrations/aws/utils/overrides.py index 37b1e580a7..7562b32e33 100644 --- a/integrations/aws/utils/overrides.py +++ b/integrations/aws/utils/overrides.py @@ -30,7 +30,9 @@ def is_region_allowed(self, region: str) -> bool: - If the region is listed in the "deny" list of `region_policy`, the method returns False. - If the region is listed in the "allow" list of `region_policy`, the method returns True. - If the region is not listed in either "allow" or "deny" lists, the method returns False. - + - If the region is listed in both "allow" and "deny" lists, the method returns False. + - If the policy denies regions but does not explicitly allow any, and the specific region is not in the deny list, then the region is considered allowed. + - If the policy allows regions but does not explicitly deny any, and the specific region is not in the allow list, then the region is considered denied. Args: region (str): The region to be checked. @@ -43,7 +45,10 @@ def is_region_allowed(self, region: str) -> bool: return False if region in self.region_policy.allow: return True - + if self.region_policy.deny and not self.region_policy.allow: + return True + if self.region_policy.allow and not self.region_policy.deny: + return False return False From 361f6ce2c31aeb32ba26aaf8fb7d424442469426 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 31 Oct 2024 09:50:28 +0000 Subject: [PATCH 13/23] added tests --- integrations/aws/main.py | 21 ++++++-- integrations/aws/tests/utils/test_misc.py | 66 ++++++++++++++++++++++- integrations/aws/utils/misc.py | 12 ++--- 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 3106964df0..3ddc94e68a 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -1,5 +1,6 @@ import json import typing +from typing import List from fastapi import Response, status import fastapi @@ -28,6 +29,8 @@ from port_ocean.context.ocean import ocean from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE +from port_ocean.context.event import event +from utils.overrides import AWSResourceConfig from utils.misc import ( get_matching_kinds_and_blueprints_from_config, CustomProperties, @@ -298,8 +301,17 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons with logger.contextualize( account_id=account_id, resource_type=resource_type, identifier=identifier ): + aws_resource_config = typing.cast( + List[AWSResourceConfig], event.port_app_config.resources + ) + if not isinstance(aws_resource_config, AWSResourceConfig): + logger.info("No resources configured in the port app config") + return fastapi.Response(status_code=status.HTTP_200_OK) + allowed_configs, disallowed_configs = ( - get_matching_kinds_and_blueprints_from_config(resource_type, region) + get_matching_kinds_and_blueprints_from_config( + resource_type, region, aws_resource_config + ) ) if disallowed_configs: @@ -339,8 +351,11 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}" ) return fastapi.Response(status_code=status.HTTP_200_OK) - else: - logger.exception("Failed to describe resource") + + logger.error( + f"Failed to retrieve '{resource_type}' resource with ID '{identifier}' in region '{region}' for account '{account_id}'. " + f"Verify that the resource exists and that the necessary permissions are granted." + ) resource = None diff --git a/integrations/aws/tests/utils/test_misc.py b/integrations/aws/tests/utils/test_misc.py index 45104a90ad..507d3e37ae 100644 --- a/integrations/aws/tests/utils/test_misc.py +++ b/integrations/aws/tests/utils/test_misc.py @@ -1,5 +1,15 @@ -from utils.misc import is_access_denied_exception +from utils.misc import ( + is_access_denied_exception, + get_matching_kinds_and_blueprints_from_config, +) from typing import Optional, Dict, Any +import unittest +from utils.overrides import AWSResourceConfig, AWSDescribeResourcesSelector +from port_ocean.core.handlers.port_app_config.models import ( + PortResourceConfig, + EntityMapping, + MappingsConfig, +) class MockException(Exception): @@ -25,3 +35,57 @@ def test_access_denied_exception_with_other_error() -> None: def test_access_denied_exception_no_response_attribute() -> None: e = Exception("Test exception") assert not is_access_denied_exception(e) + + +class TestGetMatchingKindsAndBlueprintsFromConfig(unittest.TestCase): + + def test_get_matching_kinds_and_blueprints(self) -> None: + # Set up actual object instances + selector = AWSDescribeResourcesSelector(query="true") + entity = EntityMapping( + identifier="lambda_function", + blueprint="LambdaBlueprint", + ) + mapping = MappingsConfig(mappings=entity) + port_resource_config = PortResourceConfig(entity=mapping) + + resource_config = AWSResourceConfig( + kind="AWS::Lambda::Function", selector=selector, port=port_resource_config + ) + + kind = "AWS::Lambda::Function" + region = "us-west-1" + resource_config_list = [resource_config] + + allowed_kinds, disallowed_kinds = get_matching_kinds_and_blueprints_from_config( + kind, region, resource_config_list + ) + + self.assertEqual(allowed_kinds, {kind: ["LambdaBlueprint"]}) + self.assertEqual(disallowed_kinds, {}) + + def test_no_matching_kind(self) -> None: + selector = AWSDescribeResourcesSelector(query="true") + entity = EntityMapping( + identifier="AnotherIdentifier", + blueprint="DifferentBlueprint", + ) + mapping = MappingsConfig(mappings=entity) + port_resource_config = PortResourceConfig(entity=mapping) + + resource_config = AWSResourceConfig( + kind="AWS::SomeOther::Resource", + selector=selector, + port=port_resource_config, + ) + + kind = "AWS::Lambda::Function" + region = "us-west-1" + resource_config_list = [resource_config] + + allowed_kinds, disallowed_kinds = get_matching_kinds_and_blueprints_from_config( + kind, region, resource_config_list + ) + + self.assertEqual(allowed_kinds, {}) + self.assertEqual(disallowed_kinds, {}) diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index c4e06885f7..0873c087e9 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -1,9 +1,8 @@ import enum -from port_ocean.context.event import event from port_ocean.context.ocean import ocean -from utils.overrides import AWSDescribeResourcesSelector -import typing +from utils.overrides import AWSResourceConfig +from typing import List import asyncio @@ -53,14 +52,13 @@ def is_server_error(e: Exception) -> bool: def get_matching_kinds_and_blueprints_from_config( - kind: str, region: str + kind: str, region: str, resource_config: List[AWSResourceConfig] ) -> tuple[dict[str, list[str]], dict[str, list[str]]]: allowed_kinds: dict[str, list[str]] = {} disallowed_kinds: dict[str, list[str]] = {} - resources = event.port_app_config.resources - for resource in resources: + for resource in resource_config: blueprint = resource.port.entity.mappings.blueprint.strip('"') - resource_selector = typing.cast(AWSDescribeResourcesSelector, resource.selector) + resource_selector = resource.selector if not resource_selector.is_region_allowed(region) and kind == resource.kind: if kind in disallowed_kinds: disallowed_kinds[kind].append(blueprint) From 06f133ae347e4decdda9f8c048b77c8a540c4809 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 31 Oct 2024 17:22:23 +0000 Subject: [PATCH 14/23] production-ready release --- integrations/aws/main.py | 11 ++++------- integrations/aws/pyproject.toml | 2 +- integrations/aws/utils/misc.py | 4 ++-- 3 files changed, 7 insertions(+), 10 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 3ddc94e68a..a78ffe35ba 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -1,6 +1,5 @@ import json import typing -from typing import List from fastapi import Response, status import fastapi @@ -30,7 +29,7 @@ from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from port_ocean.context.event import event -from utils.overrides import AWSResourceConfig +from utils.overrides import AWSPortAppConfig from utils.misc import ( get_matching_kinds_and_blueprints_from_config, CustomProperties, @@ -301,16 +300,14 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons with logger.contextualize( account_id=account_id, resource_type=resource_type, identifier=identifier ): - aws_resource_config = typing.cast( - List[AWSResourceConfig], event.port_app_config.resources - ) - if not isinstance(aws_resource_config, AWSResourceConfig): + aws_port_app_config = typing.cast(AWSPortAppConfig, event.port_app_config) + if not isinstance(aws_port_app_config, AWSPortAppConfig): logger.info("No resources configured in the port app config") return fastapi.Response(status_code=status.HTTP_200_OK) allowed_configs, disallowed_configs = ( get_matching_kinds_and_blueprints_from_config( - resource_type, region, aws_resource_config + resource_type, region, aws_port_app_config.resources ) ) diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index bc937f0d70..ce8f9aef1c 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.52-rc1" +version = "0.2.52" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index 0873c087e9..82105e9ce0 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -52,11 +52,11 @@ def is_server_error(e: Exception) -> bool: def get_matching_kinds_and_blueprints_from_config( - kind: str, region: str, resource_config: List[AWSResourceConfig] + kind: str, region: str, resource_configs: List[AWSResourceConfig] ) -> tuple[dict[str, list[str]], dict[str, list[str]]]: allowed_kinds: dict[str, list[str]] = {} disallowed_kinds: dict[str, list[str]] = {} - for resource in resource_config: + for resource in resource_configs: blueprint = resource.port.entity.mappings.blueprint.strip('"') resource_selector = resource.selector if not resource_selector.is_region_allowed(region) and kind == resource.kind: From 3b90a0946fdd7c69aeaafc4c8d5fc0ef196c3112 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 31 Oct 2024 17:29:38 +0000 Subject: [PATCH 15/23] bumped integration version --- integrations/aws/CHANGELOG.md | 2 +- integrations/aws/pyproject.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index 052d92c4f7..fd680e1759 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -7,7 +7,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 -## 0.2.52 (2024-10-31) +## 0.2.53 (2024-10-31) ### Improvements diff --git a/integrations/aws/pyproject.toml b/integrations/aws/pyproject.toml index ce8f9aef1c..372d60064a 100644 --- a/integrations/aws/pyproject.toml +++ b/integrations/aws/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "aws" -version = "0.2.52" +version = "0.2.53" description = "This integration will map all your resources in all the available accounts to your Port entities" authors = ["Shalev Avhar ", "Erik Zaadi "] From b95b35829ecec0dbeaa5d3c5d5a5eeb67c86b885 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 31 Oct 2024 23:38:10 +0000 Subject: [PATCH 16/23] updated error handling in querying all resources --- integrations/aws/main.py | 3 --- integrations/aws/tests/utils/test_misc.py | 21 +++++++++++++++++++++ integrations/aws/utils/misc.py | 14 ++++++++++++++ integrations/aws/utils/resources.py | 11 +++++++++-- 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index a78ffe35ba..3114abd7d3 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -90,9 +90,6 @@ async def resync_resources_for_account( async for batch in resync_cloudcontrol(kind, session): yield batch except Exception as exc: - logger.error( - f"Failed to fetch {kind} for {session.region_name} in {credentials.account_id}: {exc}" - ) regions.append(session.region_name) errors.append(exc) continue diff --git a/integrations/aws/tests/utils/test_misc.py b/integrations/aws/tests/utils/test_misc.py index 507d3e37ae..0c07712f31 100644 --- a/integrations/aws/tests/utils/test_misc.py +++ b/integrations/aws/tests/utils/test_misc.py @@ -1,5 +1,6 @@ from utils.misc import ( is_access_denied_exception, + is_resource_not_found_exception, get_matching_kinds_and_blueprints_from_config, ) from typing import Optional, Dict, Any @@ -37,6 +38,26 @@ def test_access_denied_exception_no_response_attribute() -> None: assert not is_access_denied_exception(e) +def test_resource_not_found_exception_with_response() -> None: + e = MockException(response={"Error": {"Code": "ResourceNotFoundException"}}) + assert is_resource_not_found_exception(e) + + +def test_resource_not_found_exception_without_response() -> None: + e = MockException(response=None) + assert not is_resource_not_found_exception(e) + + +def test_resource_not_found_exception_with_other_error() -> None: + e = MockException(response={"Error": {"Code": "SomeOtherError"}}) + assert not is_resource_not_found_exception(e) + + +def test_resource_not_found_exception_no_response_attribute() -> None: + e = Exception("Test exception") + assert not is_resource_not_found_exception(e) + + class TestGetMatchingKindsAndBlueprintsFromConfig(unittest.TestCase): def test_get_matching_kinds_and_blueprints(self) -> None: diff --git a/integrations/aws/utils/misc.py b/integrations/aws/utils/misc.py index 82105e9ce0..c101602930 100644 --- a/integrations/aws/utils/misc.py +++ b/integrations/aws/utils/misc.py @@ -51,6 +51,20 @@ def is_server_error(e: Exception) -> bool: return False +def is_resource_not_found_exception(e: Exception) -> bool: + resource_not_found_error_codes = [ + "ResourceNotFoundException", + "ResourceNotFound", + "ResourceNotFoundFault", + ] + + if hasattr(e, "response") and e.response is not None: + error_code = e.response.get("Error", {}).get("Code") + return error_code in resource_not_found_error_codes + + return False + + def get_matching_kinds_and_blueprints_from_config( kind: str, region: str, resource_configs: List[AWSResourceConfig] ) -> tuple[dict[str, list[str]], dict[str, list[str]]]: diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index d305c2f308..34ec9f2125 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -10,6 +10,7 @@ CustomProperties, ResourceKindsWithSpecialHandling, is_access_denied_exception, + is_resource_not_found_exception, ) from utils.aws import get_sessions @@ -250,6 +251,12 @@ async def resync_cloudcontrol( logger.warning( f"Skipping resyncing {kind} in region {region} in account {account_id} due to missing access permissions" ) + elif is_resource_not_found_exception(e): + logger.warning( + f"Skipping resyncing {kind} resource with id {instance.get('Identifier')} in region {region} in account {account_id} because it doesn't exist" + ) else: - logger.warning(f"Error resyncing {kind} in region {region}, {e}") - raise e + logger.error( + f"An error occured while resyncing {kind} in region {region}, {e}" + ) + raise e From 550c23a03377fd491d9bc5eea2fdeff63c935a52 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Thu, 31 Oct 2024 23:41:02 +0000 Subject: [PATCH 17/23] refactored phrase to improve constructive understanding --- integrations/aws/utils/resources.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 34ec9f2125..62368cbc79 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -256,7 +256,5 @@ async def resync_cloudcontrol( f"Skipping resyncing {kind} resource with id {instance.get('Identifier')} in region {region} in account {account_id} because it doesn't exist" ) else: - logger.error( - f"An error occured while resyncing {kind} in region {region}, {e}" - ) + logger.error(f"Error resyncing {kind} in region {region}, {e}") raise e From f43c87988798ede4709a9c503a9784339c18d066 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Fri, 1 Nov 2024 12:28:06 +0000 Subject: [PATCH 18/23] updated changelog --- integrations/aws/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index fd680e1759..23fca62a3d 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -14,6 +14,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping. - Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently. +- Skip missing resources in a region without interrupting sync across other regions. ## 0.2.52 (2024-10-30) From 3014165802dca59176652bd8c319a52c0dda37da Mon Sep 17 00:00:00 2001 From: mkarmah Date: Fri, 1 Nov 2024 12:38:05 +0000 Subject: [PATCH 19/23] move log to Bug Fixes --- integrations/aws/CHANGELOG.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/integrations/aws/CHANGELOG.md b/integrations/aws/CHANGELOG.md index 23fca62a3d..5dd73418ff 100644 --- a/integrations/aws/CHANGELOG.md +++ b/integrations/aws/CHANGELOG.md @@ -14,6 +14,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping. - Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently. + +### Bug Fixes + - Skip missing resources in a region without interrupting sync across other regions. From 174ab9d90726f156be4003cf7d68fa6c759f0ff0 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Mon, 4 Nov 2024 13:46:02 +0000 Subject: [PATCH 20/23] skip resource not found exceptions --- integrations/aws/main.py | 33 ++++++++++++++++++++++++----- integrations/aws/utils/resources.py | 29 +++++++++++++------------ 2 files changed, 43 insertions(+), 19 deletions(-) diff --git a/integrations/aws/main.py b/integrations/aws/main.py index 3114abd7d3..fc7cac400e 100644 --- a/integrations/aws/main.py +++ b/integrations/aws/main.py @@ -29,7 +29,7 @@ from loguru import logger from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE from port_ocean.context.event import event -from utils.overrides import AWSPortAppConfig +from utils.overrides import AWSPortAppConfig, AWSResourceConfig from utils.misc import ( get_matching_kinds_and_blueprints_from_config, CustomProperties, @@ -50,12 +50,15 @@ async def _handle_global_resource_resync( kind: str, credentials: AwsCredentials, + aws_resource_config: AWSResourceConfig, ) -> ASYNC_GENERATOR_RESYNC_TYPE: denied_access_to_default_region = False default_region = get_default_region_from_credentials(credentials) default_session = await credentials.create_session(default_region) try: - async for batch in resync_cloudcontrol(kind, default_session): + async for batch in resync_cloudcontrol( + kind, default_session, aws_resource_config + ): yield batch except Exception as e: if is_access_denied_exception(e): @@ -67,7 +70,9 @@ async def _handle_global_resource_resync( logger.info(f"Trying to resync {kind} in all regions until success") async for session in credentials.create_session_for_each_region(): try: - async for batch in resync_cloudcontrol(kind, session): + async for batch in resync_cloudcontrol( + kind, session, aws_resource_config + ): yield batch break except Exception as e: @@ -81,13 +86,19 @@ async def resync_resources_for_account( """Function to handle fetching resources for a single account.""" errors, regions = [], [] + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) + if is_global_resource(kind): - async for batch in _handle_global_resource_resync(kind, credentials): + async for batch in _handle_global_resource_resync( + kind, credentials, aws_resource_config + ): yield batch else: async for session in credentials.create_session_for_each_region(): try: - async for batch in resync_cloudcontrol(kind, session): + async for batch in resync_cloudcontrol( + kind, session, aws_resource_config + ): yield batch except Exception as exc: regions.append(session.region_name) @@ -127,6 +138,7 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ELASTICACHE_CLUSTER) async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( @@ -139,6 +151,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_cache_clusters", "CacheClusters", "Marker", + aws_resource_config, ), ) async for session in get_sessions() @@ -153,6 +166,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -164,6 +178,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_load_balancers", "LoadBalancers", "Marker", + aws_resource_config, ), ) async for session in get_sessions() @@ -179,6 +194,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -190,6 +206,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "list_certificates", "CertificateSummaryList", "NextToken", + aws_resource_config, ), ) async for session in get_sessions() @@ -204,6 +221,8 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE) async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -215,6 +234,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_images", "Images", "NextToken", + aws_resource_config, {"Owners": ["self"]}, ), ) @@ -229,6 +249,8 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: @ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK) async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: await update_available_access_credentials() + + aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config) tasks = [ semaphore_async_iterator( semaphore, @@ -240,6 +262,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE: "describe_stacks", "Stacks", "NextToken", + aws_resource_config, ), ) async for session in get_sessions() diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 62368cbc79..27816ae0af 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -1,11 +1,9 @@ import asyncio import json from typing import Any, Literal -import typing import aioboto3 from loguru import logger -from port_ocean.context.event import event from utils.misc import ( CustomProperties, ResourceKindsWithSpecialHandling, @@ -110,6 +108,7 @@ async def resync_custom_kind( describe_method: str, list_param: str, marker_param: Literal["NextToken", "Marker"], + resource_config: AWSResourceConfig, describe_method_params: dict[str, Any] | None = None, ) -> ASYNC_GENERATOR_RESYNC_TYPE: """ @@ -127,9 +126,7 @@ async def resync_custom_kind( account_id = await _session_manager.find_account_id_by_session(session) next_token = None - resource_config_selector = typing.cast( - AWSResourceConfig, event.resource_config - ).selector + resource_config_selector = resource_config.selector if not resource_config_selector.is_region_allowed(region): logger.info( @@ -175,11 +172,9 @@ async def resync_custom_kind( async def resync_cloudcontrol( - kind: str, session: aioboto3.Session + kind: str, session: aioboto3.Session, resource_config: AWSResourceConfig ) -> ASYNC_GENERATOR_RESYNC_TYPE: - resource_config_selector = typing.cast( - AWSResourceConfig, event.resource_config - ).selector + resource_config_selector = resource_config.selector use_get_resource_api = resource_config_selector.use_get_resource_api region = session.region_name @@ -216,7 +211,8 @@ async def resync_cloudcontrol( region=region, ) for instance in resources - ) + ), + return_exceptions=True, ) else: resources = [ @@ -228,6 +224,15 @@ async def resync_cloudcontrol( ] for instance in resources: + if isinstance(instance, Exception): + if is_resource_not_found_exception(instance): + logger.warning( + f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {instance.get('Error').get('Message')}" + ) + continue + + raise instance + serialized = instance.copy() serialized.update( { @@ -251,10 +256,6 @@ async def resync_cloudcontrol( logger.warning( f"Skipping resyncing {kind} in region {region} in account {account_id} due to missing access permissions" ) - elif is_resource_not_found_exception(e): - logger.warning( - f"Skipping resyncing {kind} resource with id {instance.get('Identifier')} in region {region} in account {account_id} because it doesn't exist" - ) else: logger.error(f"Error resyncing {kind} in region {region}, {e}") raise e From 4d9bae0d3f34232859c45af6c1a150c58056c7ae Mon Sep 17 00:00:00 2001 From: mkarmah Date: Mon, 4 Nov 2024 15:05:01 +0000 Subject: [PATCH 21/23] lint fix --- integrations/aws/utils/resources.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 27816ae0af..80cd41b2cb 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -1,6 +1,7 @@ import asyncio import json from typing import Any, Literal +import typing import aioboto3 from loguru import logger @@ -16,6 +17,7 @@ from utils.aws import _session_manager from utils.overrides import AWSResourceConfig from botocore.config import Config as Boto3Config +from botocore.exceptions import ClientError def is_global_resource(kind: str) -> bool: @@ -226,9 +228,10 @@ async def resync_cloudcontrol( for instance in resources: if isinstance(instance, Exception): if is_resource_not_found_exception(instance): + error = typing.cast(ClientError, instance) logger.warning( - f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {instance.get('Error').get('Message')}" - ) + f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {error.response['Error']['Message']}" + ) continue raise instance From 5fc17964c2083d543e2893e776a9a6a81f2e0ae0 Mon Sep 17 00:00:00 2001 From: mkarmah Date: Mon, 4 Nov 2024 15:08:46 +0000 Subject: [PATCH 22/23] reformat resources.py --- integrations/aws/utils/resources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index 80cd41b2cb..d1e92cd89d 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -230,8 +230,8 @@ async def resync_cloudcontrol( if is_resource_not_found_exception(instance): error = typing.cast(ClientError, instance) logger.warning( - f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {error.response['Error']['Message']}" - ) + f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {error.response['Error']['Message']}" + ) continue raise instance From 92e4275af3e1b271960df16ccff87f89d88d923e Mon Sep 17 00:00:00 2001 From: mkarmah Date: Mon, 4 Nov 2024 19:18:31 +0000 Subject: [PATCH 23/23] change log level for resource not found from warning to info --- integrations/aws/utils/resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integrations/aws/utils/resources.py b/integrations/aws/utils/resources.py index d1e92cd89d..8a28810aa4 100644 --- a/integrations/aws/utils/resources.py +++ b/integrations/aws/utils/resources.py @@ -229,7 +229,7 @@ async def resync_cloudcontrol( if isinstance(instance, Exception): if is_resource_not_found_exception(instance): error = typing.cast(ClientError, instance) - logger.warning( + logger.info( f"Skipping resyncing {kind} resource in region {region} in account {account_id}; {error.response['Error']['Message']}" ) continue