Skip to content

Commit

Permalink
added cfn usage code
Browse files Browse the repository at this point in the history
Justin committed Feb 1, 2024
1 parent 7318195 commit 96bff01
Showing 5 changed files with 39 additions and 56 deletions.
Original file line number Diff line number Diff line change
@@ -19,27 +19,21 @@

if TYPE_CHECKING:
from mypy_boto3_iam import IAMClient
from mypy_boto3_iam.type_defs import CreateRoleResponseTypeDef, DeleteRoleRequestRequestTypeDef, DetachRolePolicyRequestPolicyDetachRoleTypeDef
from mypy_boto3_iam.type_defs import CreateRoleResponseTypeDef
from mypy_boto3_organizations import OrganizationsClient
from mypy_boto3_route53 import Route53Client
from mypy_boto3_route53.type_defs import ListHostedZonesResponseTypeDef
from mypy_boto3_s3 import S3Client
from mypy_boto3_shield import ShieldClient
from mypy_boto3_shield.type_defs import (
AssociateDRTLogBucketRequestRequestTypeDef,
AssociateProactiveEngagementDetailsRequestRequestTypeDef,
CreateProtectionGroupRequestRequestTypeDef,
CreateProtectionResponseTypeDef,
DeleteProtectionGroupRequestRequestTypeDef,
DeleteProtectionRequestRequestTypeDef,
DescribeEmergencyContactSettingsResponseTypeDef,
DescribeProtectionResponseTypeDef,
DescribeSubscriptionResponseTypeDef,
DisableApplicationLayerAutomaticResponseRequestRequestTypeDef,
DisassociateDRTLogBucketRequestRequestTypeDef,
EmergencyContactTypeDef,
ProtectionTypeDef,
UpdateProtectionGroupRequestRequestTypeDef,
)


@@ -51,6 +45,7 @@

UNEXPECTED = "Unexpected!"
RESOURCES_BY_ACCOUNT: dict = {}
SHIELD_DRT_POLICY = "arn:aws:iam::aws:policy/service-role/AWSShieldDRTAccessPolicy"

try:
MANAGEMENT_ACCOUNT_SESSION = boto3.Session()
@@ -86,10 +81,10 @@ def build_resources_by_account(account_session: boto3.Session, params: dict, acc
"""
buckets: list = get_buckets_to_protect(account_session, params["SHIELD_DRT_LOG_BUCKETS"].split(","))
check_if_key_in_object("buckets", RESOURCES_BY_ACCOUNT[account_id], "list")
RESOURCES_BY_ACCOUNT[account_id]["buckets"]: list = buckets
RESOURCES_BY_ACCOUNT[account_id]["buckets"] = buckets
check_if_key_in_object("resources_to_protect", RESOURCES_BY_ACCOUNT[account_id], "list")
hosted_zones: list = get_route_53_hosted_zones(account_session)
RESOURCES_BY_ACCOUNT[account_id]["resources_to_protect"]: list = hosted_zones
RESOURCES_BY_ACCOUNT[account_id]["resources_to_protect"] = hosted_zones
resources_to_protect: list = get_resources_to_protect_in_account(account_id, params["RESOURCES_TO_PROTECT"].split(","))
RESOURCES_BY_ACCOUNT[account_id]["resources_to_protect"].extend(resources_to_protect)

@@ -321,14 +316,12 @@ def detach_drt_role_policy(account_session: boto3.Session, role_name: str) -> No
try:
LOGGER.info("detaching DRT role policy")
iam_client: IAMClient = account_session.client("iam")
detach_policy_response: DetachRolePolicyRequestPolicyDetachRoleTypeDef = iam_client.detach_role_policy(
RoleName=role_name, PolicyArn="arn:aws:iam::aws:policy/service-role/AWSShieldDRTAccessPolicy"
)
detach_policy_response = iam_client.detach_role_policy(RoleName=role_name, PolicyArn=SHIELD_DRT_POLICY)
api_call_details = {"API_Call": "iam:DetachRolePolicy", "API_Response": detach_policy_response}
LOGGER.info(api_call_details)
except iam_client.exceptions.NoSuchEntityException as nse:
LOGGER.info(f"NoSuchEntityException {nse}")
LOGGER.info(f"Continuing...")
LOGGER.info("Continuing...")


def delete_drt_role(account_session: boto3.Session, role_name: str) -> None:
@@ -342,7 +335,7 @@ def delete_drt_role(account_session: boto3.Session, role_name: str) -> None:
LOGGER.info("deleting DRT role")
iam_client: IAMClient = account_session.client("iam")
detach_drt_role_policy(account_session, role_name)
delete_role_response: DeleteRoleRequestRequestTypeDef = iam_client.delete_role(RoleName=role_name)
delete_role_response = iam_client.delete_role(RoleName=role_name)
api_call_details = {"API_Call": "iam:DeleteRole", "API_Response": delete_role_response}
LOGGER.info(api_call_details)
except iam_client.exceptions.NoSuchEntityException as nse:
@@ -379,9 +372,9 @@ def create_drt_role(account: str, role_name: str, account_session: boto3.Session

iam_client: IAMClient = account_session.client("iam")
role_exists = check_if_role_exists(iam_client, role_name)
role_arn = ""
role_arn: str = ""
if role_exists == "":
create_role_response: CreateRoleResponseTypeDef = iam_client.create_role(
create_role_response = iam_client.create_role(
RoleName=role_name,
AssumeRolePolicyDocument="""{
"Version": "2012-10-17",
@@ -396,15 +389,11 @@ def create_drt_role(account: str, role_name: str, account_session: boto3.Session
]
}""",
)
attach_policy_response = iam_client.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSShieldDRTAccessPolicy", RoleName=role_name
)
role_arn: str = create_role_response["Role"]["Arn"]
attach_policy_response = iam_client.attach_role_policy(PolicyArn=SHIELD_DRT_POLICY, RoleName=role_name)
role_arn = create_role_response["Role"]["Arn"]
else:
role_arn = role_exists
attach_policy_response = iam_client.attach_role_policy(
PolicyArn="arn:aws:iam::aws:policy/service-role/AWSShieldDRTAccessPolicy", RoleName=role_name
)
attach_policy_response = iam_client.attach_role_policy(PolicyArn=SHIELD_DRT_POLICY, RoleName=role_name)

api_call_details = {"API_Call": "iam:AttachRolePolicy", "API_Response": attach_policy_response}
LOGGER.info(api_call_details)
@@ -453,7 +442,7 @@ def delete_protection(shield_client: ShieldClient, resource_arn: str) -> None:
protection_id: str = get_protection_id(shield_client, resource_arn)
if protection_id != "":
LOGGER.info(f"Deleting protection for {resource_arn} and protectionId {protection_id}")
delete_protection_response: DeleteProtectionRequestRequestTypeDef = shield_client.delete_protection(ProtectionId=protection_id)
delete_protection_response = shield_client.delete_protection(ProtectionId=protection_id)
api_call_details = {"API_Call": "shield:DeleteProtection", "API_Response": delete_protection_response}
LOGGER.info(api_call_details)
else:
@@ -468,7 +457,7 @@ def associate_drt_log_bucket(shield_client: ShieldClient, log_bucket: str) -> No
shield_client: shield client
log_bucket: bucket to grant access via bucket policy
"""
associate_drt_log_response: AssociateDRTLogBucketRequestRequestTypeDef = shield_client.associate_drt_log_bucket(LogBucket=log_bucket)
associate_drt_log_response = shield_client.associate_drt_log_bucket(LogBucket=log_bucket)
api_call_details = {"API_Call": "shield:AssociateDRTLogBucket", "API_Response": associate_drt_log_response}
LOGGER.info(api_call_details)

@@ -480,7 +469,7 @@ def disassociate_drt_log_bucket(shield_client: ShieldClient, log_bucket: str) ->
shield_client: shield client
log_bucket: bucket to update the policy
"""
disassociate_drt_log_response: DisassociateDRTLogBucketRequestRequestTypeDef = shield_client.disassociate_drt_log_bucket(LogBucket=log_bucket)
disassociate_drt_log_response = shield_client.disassociate_drt_log_bucket(LogBucket=log_bucket)
api_call_details = {"API_Call": "shield:DisassociateDRTLogBucket", "API_Response": disassociate_drt_log_response}
LOGGER.info(api_call_details)

@@ -535,7 +524,7 @@ def check_proactive_engagement_enabled(shield_client: ShieldClient, params: dict
time.sleep(5)
check_proactive_engagement_enabled(shield_client, params, retry + 1)
else:
raise ValueError("Proactive engagement status not found")
return False


def check_if_protection_group_exists(shield_client: ShieldClient, protection_group_id: str) -> bool:
@@ -581,9 +570,7 @@ def delete_protection_group(shield_client: ShieldClient, params: dict, account_i
pg_id: str = params[f"PROTECTION_GROUP_{i}_ID"]
if account_id == params[f"PROTECTION_GROUP_{i}_ACCOUNT_ID"]:
if pg_id != "":
delete_protection_group_response: DeleteProtectionGroupRequestRequestTypeDef = shield_client.delete_protection_group(
ProtectionGroupId=pg_id
)
delete_protection_group_response = shield_client.delete_protection_group(ProtectionGroupId=pg_id)
api_call_details = {"API_Call": "shield:DeleteProtectionGroup", "API_Response": delete_protection_group_response}
LOGGER.info(api_call_details)
else:
@@ -603,7 +590,7 @@ def update_protection_group(
"APPLICATION_LOAD_BALANCER",
"GLOBAL_ACCELERATOR",
],
pg_members: list,
pg_members: list[str],
) -> None:
"""Updates an existing protection group
@@ -616,17 +603,15 @@ def update_protection_group(
pg_members: protection group members
"""
if pg_pattern == "BY_RESOURCE_TYPE":
protection_group_response: UpdateProtectionGroupRequestRequestTypeDef = shield_client.update_protection_group(
protection_group_response = shield_client.update_protection_group(
ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern, ResourceType=pg_resource_type
)
elif pg_pattern == "ARBITRARY":
protection_group_response: UpdateProtectionGroupRequestRequestTypeDef = shield_client.update_protection_group(
protection_group_response = shield_client.update_protection_group(
ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern, Members=pg_members.split(",")
)
else:
protection_group_response: UpdateProtectionGroupRequestRequestTypeDef = shield_client.update_protection_group(
ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern
)
protection_group_response = shield_client.update_protection_group(ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern)
api_call_details = {"API_Call": "shield:UpdateProtectionGroup", "API_Response": protection_group_response}
LOGGER.info(api_call_details)

@@ -660,15 +645,15 @@ def create_protection_group(shield_client: ShieldClient, params: dict, account_i
break
LOGGER.info(f"Creating Protection_Group_{i} in {account_id}")
if pg_pattern == "BY_RESOURCE_TYPE":
protection_group_response: CreateProtectionGroupRequestRequestTypeDef = shield_client.create_protection_group(
protection_group_response = shield_client.create_protection_group(
ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern, ResourceType=pg_resource_type
)
elif pg_pattern == "ARBITRARY":
protection_group_response: CreateProtectionGroupRequestRequestTypeDef = shield_client.create_protection_group(
protection_group_response = shield_client.create_protection_group(
ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern, Members=pg_members.split(",")
)
else:
protection_group_response: CreateProtectionGroupRequestRequestTypeDef = shield_client.create_protection_group(
protection_group_response = shield_client.create_protection_group(
ProtectionGroupId=pg_id, Aggregation=pg_aggregation, Pattern=pg_pattern
)
api_call_details = {"API_Call": "shield:CreateProtectionGroup", "API_Response": protection_group_response}
@@ -687,7 +672,7 @@ def check_emergency_contacts(shield_client: ShieldClient) -> bool:
emergency_contacts_response: DescribeEmergencyContactSettingsResponseTypeDef = shield_client.describe_emergency_contact_settings()
api_call_details = {"API_Call": "shield:DescribeEmergencyContactSettings", "API_Response": emergency_contacts_response}
LOGGER.info(api_call_details)
if len(emergency_contacts_response) > 0:
if "EmergencyContactList" in emergency_contacts_response and len(emergency_contacts_response["EmergencyContactList"]) > 0:
return True
else:
return False
@@ -716,23 +701,21 @@ def enable_proactive_engagement(shield_client: ShieldClient, params: dict) -> No
LOGGER.info(f"SHIELD_ENABLE_PROACTIVE_ENGAGEMENT is set to {params['SHIELD_ENABLE_PROACTIVE_ENGAGEMENT']}")


def associate_proactive_engagement_details(shield_client: ShieldClient, params: dict):
def associate_proactive_engagement_details(shield_client: ShieldClient, params: dict) -> None:
"""Allow the DRT to use the contact information to reach out to the contacts
Args:
shield_client: shield client
params: environment variables
"""
associate_proactive_engagement_response: AssociateProactiveEngagementDetailsRequestRequestTypeDef = (
shield_client.associate_proactive_engagement_details(
EmergencyContactList=[
{
"EmailAddress": params["SHIELD_PROACTIVE_ENGAGEMENT_EMAIL"],
"PhoneNumber": params["SHIELD_PROACTIVE_ENGAGEMENT_PHONE_NUMBER"],
"ContactNotes": params["SHIELD_PROACTIVE_ENGAGEMENT_NOTES"],
},
]
)
associate_proactive_engagement_response = shield_client.associate_proactive_engagement_details(
EmergencyContactList=[
{
"EmailAddress": params["SHIELD_PROACTIVE_ENGAGEMENT_EMAIL"],
"PhoneNumber": params["SHIELD_PROACTIVE_ENGAGEMENT_PHONE_NUMBER"],
"ContactNotes": params["SHIELD_PROACTIVE_ENGAGEMENT_NOTES"],
},
]
)
api_call_details = {"API_Call": "shield:AssociateProactiveEngagementDetails", "API_Response": associate_proactive_engagement_response}
LOGGER.info(api_call_details)
Original file line number Diff line number Diff line change
@@ -4,8 +4,8 @@
########################################################################
AWSTemplateFormatVersion: 2010-09-09
Description:
This template creates an IAM role to configure the delegated administrator account - - 'shield_org' solution in the repo,
https://github.com/aws-samples/aws-security-reference-architecture-examples (REPLACE_ME)
This template creates an IAM role to configure the delegated administrator account - 'shield_org' solution in the repo,
https://github.com/aws-samples/aws-security-reference-architecture-examples sra-1u3sd7f8u

Metadata:
SRA:
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
AWSTemplateFormatVersion: 2010-09-09
Description:
This template creates a custom resource Lambda to delegate administration and configure shield within an AWS Organization - 'shield_org' solution in
the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples (REPLACE_ME)
the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples sra-1u3sd7f8u

Metadata:
SRA:
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
AWSTemplateFormatVersion: 2010-09-09
Description:
This template creates an event rule to send organization events to the home region. - 'shield_org' solution in the repo,
https://github.com/aws-samples/aws-security-reference-architecture-examples (REPLACE_ME)
https://github.com/aws-samples/aws-security-reference-architecture-examples sra-1u3sd7f8u
Metadata:
SRA:
Version: 1.0
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
AWSTemplateFormatVersion: 2010-09-09
Description:
This template creates a custom resource Lambda to delegate administration and configure shield within an AWS Organization - 'shield_org' solution in
the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples (REPLACE_ME)
the repo, https://github.com/aws-samples/aws-security-reference-architecture-examples sra-1u3sd7f8u

Metadata:
SRA:

0 comments on commit 96bff01

Please sign in to comment.