Skip to content
This repository has been archived by the owner on Oct 4, 2024. It is now read-only.

Various fixes for MWAA verify_env.py script #206

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions MWAA/verify_env/verify_env.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ def validation_profile(profile_name):
'''
verify profile name doesn't have path to files or unexpected input
'''
if re.match(r"^[a-zA-Z0-9]*$", profile_name):
if re.match(r"^[a-zA-Z0-9._-]*$", profile_name):
return profile_name
raise argparse.ArgumentTypeError("%s is an invalid profile name value" % profile_name)

Expand Down Expand Up @@ -428,7 +428,7 @@ def check_iam_permissions(input_env, iam_client):
"kms:Encrypt"
],
ResourceArns=[
"arn:aws:kms:*:111122223333:key/*"
"arn:aws:kms:*:" + account_id + ":key/*"
],
ContextEntries=[
{
Expand All @@ -446,7 +446,7 @@ def check_iam_permissions(input_env, iam_client):
"kms:GenerateDataKey*"
],
ResourceArns=[
"arn:aws:kms:*:111122223333:key/*"
"arn:aws:kms:*:" + account_id + ":key/*"
],
ContextEntries=[
{
Expand Down Expand Up @@ -580,13 +580,13 @@ def check_egress_acls(acls, dst_port):
'''
for acl in acls:
# check ipv4 acl rule only
if acl.get('CidrBlock'):
if acl.get('CidrBlock') and acl.get('Protocol') != '1':
# Check Port
if ((acl.get('Protocol') == '-1') or
(dst_port in range(acl['PortRange']['From'], acl['PortRange']['To'] + 1))):
# Check Action
return acl['RuleAction'] == 'allow'
return ""
return False


def check_ingress_acls(acls, src_port_from, src_port_to):
Expand All @@ -595,15 +595,15 @@ def check_ingress_acls(acls, src_port_from, src_port_to):
'''
for acl in acls:
# check ipv4 acl rule only
if acl.get('CidrBlock'):
if acl.get('CidrBlock') and acl.get('Protocol') != '1':
# Check Port
test_range = range(src_port_from, src_port_to)
test_range = range(src_port_from, src_port_to + 1)
set_test_range = set(test_range)
if ((acl.get('Protocol') == '-1') or
set_test_range.issubset(range(acl['PortRange']['From'], acl['PortRange']['To'] + 1))):
# Check Action
return acl['RuleAction'] == 'allow'
return ""
return False


def check_nacl(input_subnets, input_subnet_ids, ec2_client):
Expand Down Expand Up @@ -879,7 +879,7 @@ def check_connectivity_to_dep_services(input_env, input_subnets, ec2_client, ssm
interface_ip, "and", service['service'], "on port", service['port'])
print("Please follow this link to view the results of the test:")
print("https://console.aws.amazon.com/systems-manager/automation/execution/" + ssm_execution_id +
"?REGION=" + REGION + "\n")
"?region=" + REGION + "\n")
break
except ClientError as client_error:
print('Attempt', i, 'Encountered error', client_error.response['Error']['Message'], ' retrying...')
Expand Down