Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration] [AWS] | Added support to choose specific regions to query resources from #1099

Merged
merged 30 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c318294
support to query specific regions
mk-armah Oct 23, 2024
4318eaf
region policy for seemless control
mk-armah Oct 24, 2024
273d922
Merge branch 'main' into improvement-aws
mk-armah Oct 24, 2024
5943bd9
bump integration version
mk-armah Oct 24, 2024
443a314
correct spacing in changelog
mk-armah Oct 24, 2024
932ae04
query regions optionally in resources with special handling
mk-armah Oct 24, 2024
6928f77
updated version
mk-armah Oct 25, 2024
9f8bb71
Update integrations/aws/CHANGELOG.md
mk-armah Oct 26, 2024
627bb9e
real time event supports region policy
mk-armah Oct 26, 2024
c15fb33
lint
mk-armah Oct 26, 2024
3747e19
cast selector type
mk-armah Oct 26, 2024
b337b21
minimal update to webhook event logic
mk-armah Oct 26, 2024
e8fea8f
updated repolicy logic
mk-armah Oct 26, 2024
42b64d9
Merge branch 'main' into improvement-aws
mk-armah Oct 28, 2024
361f6ce
added tests
mk-armah Oct 31, 2024
22bb035
Merge branch 'improvement-aws' of https://github.com/port-labs/ocean …
mk-armah Oct 31, 2024
06f133a
production-ready release
mk-armah Oct 31, 2024
d065e06
Merge branch 'main' into improvement-aws
mk-armah Oct 31, 2024
3b90a09
bumped integration version
mk-armah Oct 31, 2024
b95b358
updated error handling in querying all resources
mk-armah Oct 31, 2024
550c23a
refactored phrase to improve constructive understanding
mk-armah Oct 31, 2024
f43c879
updated changelog
mk-armah Nov 1, 2024
3014165
move log to Bug Fixes
mk-armah Nov 1, 2024
174ab9d
skip resource not found exceptions
mk-armah Nov 4, 2024
7cd9575
Merge branch 'main' into improvement-aws
mk-armah Nov 4, 2024
4d9bae0
lint fix
mk-armah Nov 4, 2024
6bf9095
Merge branch 'main' into improvement-aws
mk-armah Nov 4, 2024
5fc1796
reformat resources.py
mk-armah Nov 4, 2024
9287938
Merge branch 'improvement-aws' of https://github.com/port-labs/ocean …
mk-armah Nov 4, 2024
92e4275
change log level for resource not found from warning to info
mk-armah Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integrations/aws/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ configurations:
type: string
sensitive: true
description: AWS API Key for custom events, used to validate the event source for real-time event updates.
- name: maximumConcurrentAccounts
type: integer
require: false
description: The number of concurrent accounts to scan. By default, it is set to 50.
default: 50
deploymentMethodRequirements:
- type: default
configurations: ['awsAccessKeyId', 'awsSecretAccessKey']
Expand Down
9 changes: 9 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,15 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.52 (2024-10-23)


### Improvements

- Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping.
- Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently.


## 0.2.51 (2024-10-23)


Expand Down
66 changes: 41 additions & 25 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,16 @@
ResourceKindsWithSpecialHandling,
is_access_denied_exception,
is_server_error,
semaphore,
get_semaphore,
)
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)
import functools

semaphore = get_semaphore()


async def _handle_global_resource_resync(
kind: str,
Expand Down Expand Up @@ -86,6 +88,9 @@ async def resync_resources_for_account(
async for batch in resync_cloudcontrol(kind, session):
yield batch
except Exception as exc:
logger.error(
f"Failed to fetch {kind} for {session.region_name} in {credentials.account_id}: {exc}"
)
regions.append(session.region_name)
errors.append(exc)
continue
Expand Down Expand Up @@ -293,10 +298,28 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
with logger.contextualize(
account_id=account_id, resource_type=resource_type, identifier=identifier
):
matching_resource_configs = get_matching_kinds_and_blueprints_from_config(
resource_type
allowed_configs, disallowed_configs = (
get_matching_kinds_and_blueprints_from_config(resource_type, region)
)

if disallowed_configs:
logger.info(
f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed"
)
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprints in disallowed_configs.values()
for blueprint in blueprints
]
)

if not allowed_configs:
logger.info(
f"{resource_type} not found or disabled for region {region} in account {account_id}"
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.debug(
"Querying full resource on AWS before registering change in port"
)
Expand All @@ -310,33 +333,26 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)
if is_server_error(e):
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)
else:
logger.exception("Failed to describe resource")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which resource? add log containing more context about the resource, region etc.


resource = None

for kind in matching_resource_configs:
blueprints = matching_resource_configs[kind]
for kind, blueprints in allowed_configs.items():
if not resource: # Resource probably deleted
for blueprint in blueprints:
logger.info(
"Resource not found in AWS, un-registering from port"
)
await ocean.unregister(
[
Entity(
blueprint=blueprint,
identifier=identifier,
)
]
)
logger.info("Resource not found in AWS, un-registering from port")
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprint in blueprints
]
)
else: # Resource found in AWS, update port
logger.info("Resource found in AWS, registering change in port")
resource.update(
Expand All @@ -347,14 +363,14 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
}
)
await ocean.register_raw(
kind,
[fix_unserializable_date_properties(resource)],
kind, [fix_unserializable_date_properties(resource)]
)

logger.info("Webhook processed successfully")
return fastapi.Response(
status_code=status.HTTP_200_OK, content=json.dumps({"ok": True})
)

except Exception as e:
logger.exception("Failed to process event from aws")
return fastapi.Response(
Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.51"
version = "0.2.52-rc1"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <[email protected]>", "Erik Zaadi <[email protected]>"]

Expand Down
59 changes: 59 additions & 0 deletions integrations/aws/tests/utils/test_overrides.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import unittest
from utils.overrides import AWSDescribeResourcesSelector, RegionPolicy


class TestAWSDescribeResourcesSelector(unittest.TestCase):

def test_is_region_allowed_no_policy(self) -> None:
selector = AWSDescribeResourcesSelector(query="test")
self.assertTrue(selector.is_region_allowed("us-east-1"))

def test_is_region_allowed_deny_policy(self) -> None:
region_policy = RegionPolicy(deny=["us-east-1"])
selector = AWSDescribeResourcesSelector(
query="test", regionPolicy=region_policy
)
self.assertFalse(selector.is_region_allowed("us-east-1"))
self.assertTrue(selector.is_region_allowed("us-west-2"))

def test_is_region_allowed_allow_policy(self) -> None:
region_policy = RegionPolicy(allow=["us-west-2"])
selector = AWSDescribeResourcesSelector(
query="test", regionPolicy=region_policy
)
self.assertTrue(selector.is_region_allowed("us-west-2"))
self.assertFalse(selector.is_region_allowed("us-east-1"))

def test_is_region_allowed_both_policies(self) -> None:
region_policy = RegionPolicy(allow=["us-west-2"], deny=["us-east-1"])
selector = AWSDescribeResourcesSelector(
query="test", regionPolicy=region_policy
)
self.assertFalse(selector.is_region_allowed("us-east-1"))
self.assertTrue(selector.is_region_allowed("us-west-2"))
self.assertFalse(selector.is_region_allowed("eu-central-1"))

def test_is_region_allowed_conflicting_policies(self) -> None:
region_policy = RegionPolicy(allow=["us-east-1"], deny=["us-east-1"])
selector = AWSDescribeResourcesSelector(
query="test", regionPolicy=region_policy
)
self.assertFalse(selector.is_region_allowed("us-east-1"))

def test_is_region_allowed_deny_only(self) -> None:
region_policy = RegionPolicy(deny=["us-east-1", "us-west-2"])
selector = AWSDescribeResourcesSelector(
query="test", regionPolicy=region_policy
)
self.assertFalse(selector.is_region_allowed("us-east-1"))
self.assertFalse(selector.is_region_allowed("us-west-2"))
self.assertTrue(selector.is_region_allowed("eu-central-1"))

def test_is_region_allowed_allow_only(self) -> None:
region_policy = RegionPolicy(allow=["us-east-1", "us-west-2"])
selector = AWSDescribeResourcesSelector(
query="test", regionPolicy=region_policy
)
self.assertTrue(selector.is_region_allowed("us-east-1"))
self.assertTrue(selector.is_region_allowed("us-west-2"))
self.assertFalse(selector.is_region_allowed("eu-central-1"))
34 changes: 24 additions & 10 deletions integrations/aws/utils/misc.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,18 @@
import enum

from port_ocean.context.event import event
from port_ocean.context.ocean import ocean
from utils.overrides import AWSDescribeResourcesSelector
import typing
import asyncio


MAX_CONCURRENT_TASKS = 50
semaphore = asyncio.BoundedSemaphore(MAX_CONCURRENT_TASKS)
def get_semaphore() -> asyncio.BoundedSemaphore:
max_concurrent_accounts: int = int(
ocean.integration_config["maximum_concurrent_accounts"]
)
semaphore = asyncio.BoundedSemaphore(max_concurrent_accounts)
return semaphore


class CustomProperties(enum.StrEnum):
Expand Down Expand Up @@ -46,16 +53,23 @@ def is_server_error(e: Exception) -> bool:


def get_matching_kinds_and_blueprints_from_config(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test it? with the new expected behavior?

kind: str,
) -> dict[str, list[str]]:
kinds: dict[str, list[str]] = {}
kind: str, region: str
) -> tuple[dict[str, list[str]], dict[str, list[str]]]:
allowed_kinds: dict[str, list[str]] = {}
disallowed_kinds: dict[str, list[str]] = {}
resources = event.port_app_config.resources

for resource in resources:
blueprint = resource.port.entity.mappings.blueprint.strip('"')
if resource.kind in kinds:
kinds[resource.kind].append(blueprint)
resource_selector = typing.cast(AWSDescribeResourcesSelector, resource.selector)
if not resource_selector.is_region_allowed(region) and kind == resource.kind:
if kind in disallowed_kinds:
disallowed_kinds[kind].append(blueprint)
else:
disallowed_kinds[kind] = [blueprint]
elif kind == resource.kind:
kinds[resource.kind] = [blueprint]
if kind in allowed_kinds:
allowed_kinds[kind].append(blueprint)
else:
allowed_kinds[kind] = [blueprint]

return kinds
return allowed_kinds, disallowed_kinds
44 changes: 43 additions & 1 deletion integrations/aws/utils/overrides.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,53 @@
PortAppConfig,
Selector,
)
from pydantic import Field
from pydantic import Field, BaseModel
from typing import List


class RegionPolicy(BaseModel):
allow: List[str] = Field(default_factory=list)
deny: List[str] = Field(default_factory=list)


class AWSDescribeResourcesSelector(Selector):
use_get_resource_api: bool = Field(alias="useGetResourceAPI", default=False)
region_policy: RegionPolicy = Field(
alias="regionPolicy", default_factory=RegionPolicy
)

def is_region_allowed(self, region: str) -> bool:
"""
Determines if a given region is allowed based on the query regions policy.
This method checks the `region_policy` attribute to decide if the specified
region should be allowed or denied. The policy can contain "allow" and "deny" lists
which dictate the behavior.

Scenarios:
- If `region_policy` is not set or empty, the method returns True, allowing all regions.
- If the region is listed in the "deny" list of `region_policy`, the method returns False.
- If the region is listed in the "allow" list of `region_policy`, the method returns True.
- If the region is not listed in either "allow" or "deny" lists, the method returns False.
- If the region is listed in both "allow" and "deny" lists, the method returns False.
- If the policy denies regions but does not explicitly allow any, and the specific region is not in the deny list, then the region is considered allowed.
- If the policy allows regions but does not explicitly deny any, and the specific region is not in the allow list, then the region is considered denied.
Args:
region (str): The region to be checked.

Returns:
bool: True if the region is allowed, False otherwise.
"""
if not self.region_policy.allow and not self.region_policy.deny:
return True
if region in self.region_policy.deny:
return False
if region in self.region_policy.allow:
return True
if self.region_policy.deny and not self.region_policy.allow:
return True
if self.region_policy.allow and not self.region_policy.deny:
return False
return False


class AWSResourceConfig(ResourceConfig):
Expand Down
22 changes: 20 additions & 2 deletions integrations/aws/utils/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,17 @@ async def resync_custom_kind(
region = session.region_name
account_id = await _session_manager.find_account_id_by_session(session)
next_token = None

resource_config_selector = typing.cast(
AWSResourceConfig, event.resource_config
).selector

if not resource_config_selector.is_region_allowed(region):
logger.info(
f"Skipping resyncing {kind} in region {region} in account {account_id} because it's not allowed"
)
return

if not describe_method_params:
describe_method_params = {}
while True:
Expand Down Expand Up @@ -165,11 +176,18 @@ async def resync_custom_kind(
async def resync_cloudcontrol(
kind: str, session: aioboto3.Session
) -> ASYNC_GENERATOR_RESYNC_TYPE:
use_get_resource_api = typing.cast(
resource_config_selector = typing.cast(
AWSResourceConfig, event.resource_config
).selector.use_get_resource_api
).selector
use_get_resource_api = resource_config_selector.use_get_resource_api

region = session.region_name
account_id = await _session_manager.find_account_id_by_session(session)
if not resource_config_selector.is_region_allowed(region):
logger.info(
f"Skipping resyncing {kind} in region {region} in account {account_id} because it's not allowed"
)
return
logger.info(f"Resyncing {kind} in account {account_id} in region {region}")
next_token = None
while True:
Expand Down