Skip to content

Commit

Permalink
Merge branch 'main' into feat/async-gitlab
Browse files Browse the repository at this point in the history
  • Loading branch information
victor-devv authored Nov 8, 2024
2 parents 7c960a4 + 25140ed commit 7cb7217
Show file tree
Hide file tree
Showing 112 changed files with 2,479 additions and 13,372 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,23 @@ this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm

<!-- towncrier release notes start -->

## 0.12.9 (2024-11-07)


### Bug Fixes

- Await logger writing exception on exit (Integration logs not being ingested)
- Await logger thread on exit (Integration logs not being ingested)
- Serialize exception (Integration logs not being ingested)


## 0.12.8 (2024-11-04)


### Improvements

- Bump fastapi to version 0.115.3 - fix Starlette Denial of service (DoS) via multipart/form-data (0.12.8)

## 0.12.7 (2024-10-23)


Expand Down
2 changes: 2 additions & 0 deletions docs/framework-guides/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
# Generated files
.docusaurus
.cache-loader
changelog
static/LICENSE.md

# Misc
.DS_Store
Expand Down
789 changes: 387 additions & 402 deletions docs/framework-guides/package-lock.json

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions integrations/argocd/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,14 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.1.98 (2024-11-06)


### Improvements

- Bumped ocean version to ^0.12.8


## 0.1.97 (2024-10-23)


Expand Down
1,165 changes: 403 additions & 762 deletions integrations/argocd/poetry.lock

Large diffs are not rendered by default.

5 changes: 3 additions & 2 deletions integrations/argocd/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
[tool.poetry]
name = "argocd"
version = "0.1.97"
version = "0.1.98"
description = "Argo CD integration powered by Ocean"
authors = ["Isaac Coffie <[email protected]>"]

[tool.poetry.dependencies]
python = "^3.11"
port_ocean = {version = "0.12.7", extras = ["cli"]}
port_ocean = {version = "0.12.8", extras = ["cli"]}

[tool.poetry.group.dev.dependencies]
# Uncomment this if you want to debug the ocean core together with your integration
Expand Down Expand Up @@ -85,6 +85,7 @@ disallow_untyped_defs = true
[tool.ruff]
# Never enforce `E501` (line length violations).
ignore = ["E501"]
target-version = "py311"

[tool.pydantic-mypy]
init_forbid_extra = true
Expand Down
2 changes: 1 addition & 1 deletion integrations/aws/.port/resources/port-app-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ resources:
email: .Email
status: .Status
joined_method: .JoinedMethod
joined_timestamp: .JoinedTimestamp
joined_timestamp: .JoinedTimestamp | sub(" "; "T")

- kind: AWS::S3::Bucket
selector:
Expand Down
5 changes: 5 additions & 0 deletions integrations/aws/.port/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@ configurations:
type: string
sensitive: true
description: AWS API Key for custom events, used to validate the event source for real-time event updates.
- name: maximumConcurrentAccounts
type: integer
require: false
description: The number of concurrent accounts to scan. By default, it is set to 50.
default: 50
deploymentMethodRequirements:
- type: default
configurations: ['awsAccessKeyId', 'awsSecretAccessKey']
Expand Down
29 changes: 29 additions & 0 deletions integrations/aws/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,35 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

<!-- towncrier release notes start -->

## 0.2.54 (2024-11-06)


### Improvements

- Bumped ocean version to ^0.12.8


## 0.2.53 (2024-10-31)


### Improvements

- Added the option to query resources from specific regions, configurable via the regionPolicy in the selector field of the mapping.
- Introduced `maximumConcurrentAccount` parameter to control the maximum number of accounts synced concurrently.

### Bug Fixes

- Skip missing resources in a region without interrupting sync across other regions.


## 0.2.52 (2024-10-30)


### Bug Fixes

- Updated `joined_timestamp` mapping in AWS Organizations to comply with RFC3339 timestamp format by replacing the space delimiter with 'T' in the `JoinedTimestamp` field.


## 0.2.51 (2024-10-23)


Expand Down
106 changes: 77 additions & 29 deletions integrations/aws/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,30 +28,37 @@
from port_ocean.context.ocean import ocean
from loguru import logger
from port_ocean.core.ocean_types import ASYNC_GENERATOR_RESYNC_TYPE
from port_ocean.context.event import event
from utils.overrides import AWSPortAppConfig, AWSResourceConfig
from utils.misc import (
get_matching_kinds_and_blueprints_from_config,
CustomProperties,
ResourceKindsWithSpecialHandling,
is_access_denied_exception,
is_server_error,
semaphore,
get_semaphore,
)
from port_ocean.utils.async_iterators import (
stream_async_iterators_tasks,
semaphore_async_iterator,
)
import functools

semaphore = get_semaphore()


async def _handle_global_resource_resync(
kind: str,
credentials: AwsCredentials,
aws_resource_config: AWSResourceConfig,
) -> ASYNC_GENERATOR_RESYNC_TYPE:
denied_access_to_default_region = False
default_region = get_default_region_from_credentials(credentials)
default_session = await credentials.create_session(default_region)
try:
async for batch in resync_cloudcontrol(kind, default_session):
async for batch in resync_cloudcontrol(
kind, default_session, aws_resource_config
):
yield batch
except Exception as e:
if is_access_denied_exception(e):
Expand All @@ -63,7 +70,9 @@ async def _handle_global_resource_resync(
logger.info(f"Trying to resync {kind} in all regions until success")
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
async for batch in resync_cloudcontrol(
kind, session, aws_resource_config
):
yield batch
break
except Exception as e:
Expand All @@ -77,13 +86,19 @@ async def resync_resources_for_account(
"""Function to handle fetching resources for a single account."""
errors, regions = [], []

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

if is_global_resource(kind):
async for batch in _handle_global_resource_resync(kind, credentials):
async for batch in _handle_global_resource_resync(
kind, credentials, aws_resource_config
):
yield batch
else:
async for session in credentials.create_session_for_each_region():
try:
async for batch in resync_cloudcontrol(kind, session):
async for batch in resync_cloudcontrol(
kind, session, aws_resource_config
):
yield batch
except Exception as exc:
regions.append(session.region_name)
Expand Down Expand Up @@ -123,6 +138,7 @@ async def resync_account(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.ELASTICACHE_CLUSTER)
async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()
aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)

tasks = [
semaphore_async_iterator(
Expand All @@ -135,6 +151,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_cache_clusters",
"CacheClusters",
"Marker",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -149,6 +166,7 @@ async def resync_elasticache(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -160,6 +178,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_load_balancers",
"LoadBalancers",
"Marker",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -175,6 +194,7 @@ async def resync_elv2_load_balancer(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -186,6 +206,7 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"list_certificates",
"CertificateSummaryList",
"NextToken",
aws_resource_config,
),
)
async for session in get_sessions()
Expand All @@ -200,6 +221,8 @@ async def resync_acm(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.AMI_IMAGE)
async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -211,6 +234,7 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_images",
"Images",
"NextToken",
aws_resource_config,
{"Owners": ["self"]},
),
)
Expand All @@ -225,6 +249,8 @@ async def resync_ami(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
@ocean.on_resync(kind=ResourceKindsWithSpecialHandling.CLOUDFORMATION_STACK)
async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
await update_available_access_credentials()

aws_resource_config = typing.cast(AWSResourceConfig, event.resource_config)
tasks = [
semaphore_async_iterator(
semaphore,
Expand All @@ -236,6 +262,7 @@ async def resync_cloudformation(kind: str) -> ASYNC_GENERATOR_RESYNC_TYPE:
"describe_stacks",
"Stacks",
"NextToken",
aws_resource_config,
),
)
async for session in get_sessions()
Expand Down Expand Up @@ -293,10 +320,35 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
with logger.contextualize(
account_id=account_id, resource_type=resource_type, identifier=identifier
):
matching_resource_configs = get_matching_kinds_and_blueprints_from_config(
resource_type
aws_port_app_config = typing.cast(AWSPortAppConfig, event.port_app_config)
if not isinstance(aws_port_app_config, AWSPortAppConfig):
logger.info("No resources configured in the port app config")
return fastapi.Response(status_code=status.HTTP_200_OK)

allowed_configs, disallowed_configs = (
get_matching_kinds_and_blueprints_from_config(
resource_type, region, aws_port_app_config.resources
)
)

if disallowed_configs:
logger.info(
f"Unregistering resource {identifier} of type {resource_type} in region {region} and account {account_id} for blueprint {disallowed_configs.values()} because it is not allowed"
)
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprints in disallowed_configs.values()
for blueprint in blueprints
]
)

if not allowed_configs:
logger.info(
f"{resource_type} not found or disabled for region {region} in account {account_id}"
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.debug(
"Querying full resource on AWS before registering change in port"
)
Expand All @@ -310,33 +362,29 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to missing access permissions {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)
if is_server_error(e):
logger.error(
f"Cannot sync {resource_type} in region {region} in account {account_id} due to server error {e}"
)
return fastapi.Response(
status_code=status.HTTP_200_OK,
)
return fastapi.Response(status_code=status.HTTP_200_OK)

logger.error(
f"Failed to retrieve '{resource_type}' resource with ID '{identifier}' in region '{region}' for account '{account_id}'. "
f"Verify that the resource exists and that the necessary permissions are granted."
)

resource = None

for kind in matching_resource_configs:
blueprints = matching_resource_configs[kind]
for kind, blueprints in allowed_configs.items():
if not resource: # Resource probably deleted
for blueprint in blueprints:
logger.info(
"Resource not found in AWS, un-registering from port"
)
await ocean.unregister(
[
Entity(
blueprint=blueprint,
identifier=identifier,
)
]
)
logger.info("Resource not found in AWS, un-registering from port")
await ocean.unregister(
[
Entity(blueprint=blueprint, identifier=identifier)
for blueprint in blueprints
]
)
else: # Resource found in AWS, update port
logger.info("Resource found in AWS, registering change in port")
resource.update(
Expand All @@ -347,14 +395,14 @@ async def webhook(update: ResourceUpdate, response: Response) -> fastapi.Respons
}
)
await ocean.register_raw(
kind,
[fix_unserializable_date_properties(resource)],
kind, [fix_unserializable_date_properties(resource)]
)

logger.info("Webhook processed successfully")
return fastapi.Response(
status_code=status.HTTP_200_OK, content=json.dumps({"ok": True})
)

except Exception as e:
logger.exception("Failed to process event from aws")
return fastapi.Response(
Expand Down
Loading

0 comments on commit 7cb7217

Please sign in to comment.