Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Integration] [AWS] | Added support to choose specific regions to query resources from #1099

Merged
merged 30 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
c318294
support to query specific regions
mk-armah Oct 23, 2024
4318eaf
region policy for seemless control
mk-armah Oct 24, 2024
273d922
Merge branch 'main' into improvement-aws
mk-armah Oct 24, 2024
5943bd9
bump integration version
mk-armah Oct 24, 2024
443a314
correct spacing in changelog
mk-armah Oct 24, 2024
932ae04
query regions optionally in resources with special handling
mk-armah Oct 24, 2024
6928f77
updated version
mk-armah Oct 25, 2024
9f8bb71
Update integrations/aws/CHANGELOG.md
mk-armah Oct 26, 2024
627bb9e
real time event supports region policy
mk-armah Oct 26, 2024
c15fb33
lint
mk-armah Oct 26, 2024
3747e19
cast selector type
mk-armah Oct 26, 2024
b337b21
minimal update to webhook event logic
mk-armah Oct 26, 2024
e8fea8f
updated repolicy logic
mk-armah Oct 26, 2024
42b64d9
Merge branch 'main' into improvement-aws
mk-armah Oct 28, 2024
361f6ce
added tests
mk-armah Oct 31, 2024
22bb035
Merge branch 'improvement-aws' of https://github.com/port-labs/ocean …
mk-armah Oct 31, 2024
06f133a
production-ready release
mk-armah Oct 31, 2024
d065e06
Merge branch 'main' into improvement-aws
mk-armah Oct 31, 2024
3b90a09
bumped integration version
mk-armah Oct 31, 2024
b95b358
updated error handling in querying all resources
mk-armah Oct 31, 2024
550c23a
refactored phrase to improve constructive understanding
mk-armah Oct 31, 2024
f43c879
updated changelog
mk-armah Nov 1, 2024
3014165
move log to Bug Fixes
mk-armah Nov 1, 2024
174ab9d
skip resource not found exceptions
mk-armah Nov 4, 2024
7cd9575
Merge branch 'main' into improvement-aws
mk-armah Nov 4, 2024
4d9bae0
lint fix
mk-armah Nov 4, 2024
6bf9095
Merge branch 'main' into improvement-aws
mk-armah Nov 4, 2024
5fc1796
reformat resources.py
mk-armah Nov 4, 2024
9287938
Merge branch 'improvement-aws' of https://github.com/port-labs/ocean …
mk-armah Nov 4, 2024
92e4275
change log level for resource not found from warning to info
mk-armah Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions integrations/aws/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ configurations:
type: string
sensitive: true
description: AWS API Key for custom events, used to validate the event source for real-time event updates.
- name: maximumConcurrentAccounts
type: integer
require: false
description: The number of concurrent accounts to scan. By default, it is set to 50.
default: 50
deploymentMethodRequirements:
- type: default
configurations: ['awsAccessKeyId', 'awsSecretAccessKey']
Expand Down
15 changes: 15 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,27 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.53 (2024-10-31)


### Improvements

- Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping.
- Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently.

### Bug Fixes

- Skip missing resources in a region without interrupting sync across other regions.


## 0.2.52 (2024-10-30)


### Bug Fixes

- Updated `joined_timestamp` mapping in AWS Organizations to comply with RFC3339 timestamp format by replacing the space delimiter with 'T' in the `JoinedTimestamp` field.


## 0.2.51 (2024-10-23)


Expand Down
106 changes: 77 additions & 29 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,37 @@
from port_ocean.context.ocean import ocean
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from port_ocean.context.event import event
from utils.overrides import AWSPortAppConfig, AWSResourceConfig
from utils.misc import (
get_matching_kinds_and_blueprints_from_config,
CustomProperties,
ResourceKindsWithSpecialHandling,
is_access_denied_exception,
is_server_error,
semaphore,
get_semaphore,
)
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)
import functools

semaphore = get_semaphore()


async def _handle_global_resource_resync(
kind: str,
credentials: AwsCredentials,
aws_resource_config: AWSResourceConfig,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
denied_access_to_default_region = False
default_region = get_default_region_from_credentials(credentials)
default_session = await credentials.create_session(default_region)
try:
async for batch in resync_cloudcontrol(kind, default_session):
async for batch in resync_cloudcontrol(
kind, default_session, aws_resource_config
):
yield batch
except Exception as e:
if is_access_denied_exception(e):
Expand All @@ -63,7 +70,9 @@ async def _handle_global_resource_resync(
logger.info(f"Trying to resync {kind} in all regions until success")
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
async for batch in resync_cloudcontrol(
kind, session, aws_resource_config
):
yield batch
break
except Exception as e:
Expand All @@ -77,13 +86,19 @@ async def resync_resources_for_account(
"""Function to handle fetching resources for a single account."""
errors, regions = [], []

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
async for batch in _handle_global_resource_resync(
kind, credentials, aws_resource_config
):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
async for batch in resync_cloudcontrol(
kind, session, aws_resource_config
):
yield batch
except Exception as exc:
regions.append(session.region_name)
Expand Down Expand Up @@ -123,6 +138,7 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ELASTICACHE_CLUSTER)
async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

tasks = [
semaphore_async_iterator(
Expand All @@ -135,6 +151,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_cache_clusters",
"CacheClusters",
"Marker",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -149,6 +166,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -160,6 +178,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_load_balancers",
"LoadBalancers",
"Marker",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -175,6 +194,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -186,6 +206,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"list_certificates",
"CertificateSummaryList",
"NextToken",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -200,6 +221,8 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE)
async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -211,6 +234,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_images",
"Images",
"NextToken",
aws_resource_config,
{"Owners": ["self"]},
),
)
Expand All @@ -225,6 +249,8 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK)
async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -236,6 +262,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_stacks",
"Stacks",
"NextToken",
aws_resource_config,
),
)
async for session in get_sessions()
Expand Down Expand Up @@ -293,10 +320,35 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
with logger.contextualize(
account_id=account_id, resource_type=resource_type, identifier=identifier
):
matching_resource_configs = get_matching_kinds_and_blueprints_from_config(
resource_type
aws_port_app_config = typing.cast(AWSPortAppConfig, event.port_app_config)
if not isinstance(aws_port_app_config, AWSPortAppConfig):
logger.info("No resources configured in the port app config")
return fastapi.Response(status_code=status.HTTP_200_OK)

allowed_configs, disallowed_configs = (
get_matching_kinds_and_blueprints_from_config(
resource_type, region, aws_port_app_config.resources
)
)

if disallowed_configs:
logger.info(
f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed"
)
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprints in disallowed_configs.values()
for blueprint in blueprints
]
)

if not allowed_configs:
logger.info(
f"{resource_type} not found or disabled for region {region} in account {account_id}"
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.debug(
"Querying full resource on AWS before registering change in port"
)
Expand All @@ -310,33 +362,29 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)
if is_server_error(e):
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.error(
f"Failed to retrieve '{resource_type}' resource with ID '{identifier}' in region '{region}' for account '{account_id}'. "
f"Verify that the resource exists and that the necessary permissions are granted."
)

resource = None

for kind in matching_resource_configs:
blueprints = matching_resource_configs[kind]
for kind, blueprints in allowed_configs.items():
if not resource: # Resource probably deleted
for blueprint in blueprints:
logger.info(
"Resource not found in AWS, un-registering from port"
)
await ocean.unregister(
[
Entity(
blueprint=blueprint,
identifier=identifier,
)
]
)
logger.info("Resource not found in AWS, un-registering from port")
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprint in blueprints
]
)
else: # Resource found in AWS, update port
logger.info("Resource found in AWS, registering change in port")
resource.update(
Expand All @@ -347,14 +395,14 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
}
)
await ocean.register_raw(
kind,
[fix_unserializable_date_properties(resource)],
kind, [fix_unserializable_date_properties(resource)]
)

logger.info("Webhook processed successfully")
return fastapi.Response(
status_code=status.HTTP_200_OK, content=json.dumps({"ok": True})
)

except Exception as e:
logger.exception("Failed to process event from aws")
return fastapi.Response(
Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "aws"
version = "0.2.52"
version = "0.2.53"
description = "This integration will map all your resources in all the available accounts to your Port entities"
authors = ["Shalev Avhar <[email protected]>", "Erik Zaadi <[email protected]>"]

Expand Down
Loading