Skip to content

Commit

Permalink
move to monkeypatch for test, add deps and more clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
thedoubl3j committed Sep 5, 2024
1 parent af9404e commit 5266f82
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 55 deletions.
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [ # runtime deps # https://packaging.python.org/en/latest/guide
"python-dsv-sdk >= 1.0.4", # credentials.thycotic_dsv
"python-tss-sdk >= 1.2.1", # credentials.thycotic_tss
"requests", # credentials.aim, credentials.centrify_vault, credentials.conjur, credentials.hashivault
"datetime", # credentials.aws_assume_role
]
classifiers = [ # Allowlist: https://pypi.org/classifiers/
"Development Status :: 1 - Planning",
Expand Down Expand Up @@ -83,6 +84,7 @@ centrify_vault_kv = "awx_plugins.credentials.centrify_vault:centrify_plugin"
thycotic_dsv = "awx_plugins.credentials.dsv:dsv_plugin"
thycotic_tss = "awx_plugins.credentials.tss:tss_plugin"
aws_secretsmanager_credential = "awx_plugins.credentials.aws_secretsmanager:aws_secretmanager_plugin"
aws_assume_role = "awx_plugins.credentials.aws_assume_role:aws_assume_role_plugin"

[project.entry-points."awx_plugins.inventory"] # new entry points group name
azure-rm = "awx_plugins.inventory.plugins:azure_rm"
Expand Down
31 changes: 21 additions & 10 deletions src/awx_plugins/credentials/aws_assumerole.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
'label': 'AWS ARN Role Name',
'type': 'string',
'secret': True,
'help_text': _('The ARN Role Name to be assumed in AWS')},
'help_text': _('The ARN Role Name to be assumed in AWS'),
},
],
'metadata': [{'id': 'identifier',
'label': 'Identifier',
Expand All @@ -51,20 +52,23 @@

def aws_assumerole_getcreds(access_key, secret_key, role_arn, external_id):
if (access_key is None or len(access_key) == 0) and (
secret_key is None or len(secret_key) == 0):
secret_key is None or len(secret_key) == 0
):
# Connect using credentials in the EE
connection = boto3.client(service_name='sts')
else:
# Connect to AWS using provided credentials
connection = boto3.client(
service_name='sts',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key)
aws_secret_access_key=secret_key,
)
try:
response = connection.assume_role(
RoleArn=role_arn,
RoleSessionName='AAP_AWS_Role_Session1',
ExternalId=external_id)
ExternalId=external_id,
)
except ClientError as ce:
raise ValueError(f'Got a bad client response from AWS: {ce.msg}.')

Expand All @@ -74,7 +78,8 @@ def aws_assumerole_getcreds(access_key, secret_key, role_arn, external_id):


def aws_assumerole_backend(**kwargs):
"""This backend function actually contacts AWS to assume a given role for the specified user"""
"""This backend function actually contacts AWS to assume a given role for
the specified user."""
access_key = kwargs.get('access_key')
secret_key = kwargs.get('secret_key')
role_arn = kwargs.get('role_arn')
Expand All @@ -87,19 +92,24 @@ def aws_assumerole_backend(**kwargs):
# multiple roles.
#
credential_key_hash = hashlib.sha256(
(str(access_key or '') + role_arn).encode('utf-8'))
(str(access_key or '') + role_arn).encode('utf-8'),
)
credential_key = credential_key_hash.hexdigest()

credentials = _aws_cred_cache.get(credential_key, None)

# If there are no credentials for this user/ARN *or* the credentials
# we have in the cache have expired, then we need to contact AWS again.
#
if (credentials is None) or (credentials['Expiration'] < datetime.datetime.now(
credentials['Expiration'].tzinfo)):
if (credentials is None) or (
credentials['Expiration'] < datetime.datetime.now(
credentials['Expiration'].tzinfo,
)
):

credentials = aws_assumerole_getcreds(
access_key, secret_key, role_arn, external_id)
access_key, secret_key, role_arn, external_id,
)

_aws_cred_cache[credential_key] = credentials

Expand All @@ -114,4 +124,5 @@ def aws_assumerole_backend(**kwargs):
aws_assumerole_plugin = CredentialPlugin(
'AWS Assume Role Plugin',
inputs=assume_role_inputs,
backend=aws_assumerole_backend)
backend=aws_assumerole_backend,
)
91 changes: 46 additions & 45 deletions tests/credential_plugins_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,71 +132,72 @@ def test_hashivault_handle_auth_not_enough_args():
hashivault.handle_auth()


def test_aws_assumerole_with_accesssecret():
'''
Test that the aws_assumerole_backend function call returns a token given the access_key and secret_key.
'''
def test_aws_assumerole_with_accesssecret(monkeypatch):
"""Test that the aws_assumerole_backend function call returns a token given
the access_key and secret_key."""
kwargs = {
'access_key': 'my_access_key',
'secret_key': 'my_secret_key',
'role_arn': 'the_arn',
'identifier': 'access_token',
}
with mock.patch.object(aws_assumerole, 'aws_assumerole_getcreds') as method_mock:
method_mock.return_value = {

def mock_getcreds(access_key, secret_key, role_arn, session_token):
return {
'access_key': 'the_access_key',
'secret_key': 'the_secret_key',
'access_token': 'the_access_token',
'Expiration': datetime.datetime.today() + datetime.timedelta(days=1),
}
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_called_with(
kwargs.get('access_key'),
kwargs.get('secret_key'),
kwargs.get('role_arn'),
None)
assert token == 'the_access_token'
kwargs['identifier'] = 'secret_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_secret_key'
kwargs['identifier'] = 'access_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_access_key'


def test_aws_assumerole_with_arnonly():
'''
Test backend function with only the role ARN provided.
'''

monkeypatch.setattr(
aws_assumerole,
'aws_assumerole_getcreds',
mock_getcreds)

token = aws_assumerole.aws_assumerole_backend(**kwargs)
assert token == 'the_access_token'

kwargs['identifier'] = 'secret_key'
token = aws_assumerole.aws_assumerole_backend(**kwargs)
assert token == 'the_secret_key'

kwargs['identifier'] = 'access_key'
token = aws_assumerole.aws_assumerole_backend(**kwargs)
assert token == 'the_access_key'


def test_aws_assumerole_with_arnonly(monkeypatch):
"""Test backend function with only the role ARN provided."""
kwargs = {
'role_arn': 'the_arn',
'identifier': 'access_token',
}
with mock.patch.object(aws_assumerole, 'aws_assumerole_getcreds') as method_mock:
method_mock.return_value = {

# Define a mock function that will replace aws_assumerole_getcreds
def mock_getcreds(*args, **kwargs):
return {
'access_key': 'the_access_key',
'secret_key': 'the_secret_key',
'access_token': 'the_access_token',
'Expiration': datetime.datetime.today() + datetime.timedelta(days=1),
}
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_called_with(
None, None, kwargs.get('role_arn'), None)
assert token == 'the_access_token'
kwargs['identifier'] = 'secret_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_secret_key'
kwargs['identifier'] = 'access_key'
method_mock.reset_mock()
token = aws_assumerole.aws_assumerole_backend(**kwargs)
method_mock.assert_not_called()
assert token == 'the_access_key'

monkeypatch.setattr(
aws_assumerole,
'aws_assumerole_getcreds',
mock_getcreds)

token = aws_assumerole.aws_assumerole_backend(**kwargs)
assert token == 'the_access_token'

kwargs['identifier'] = 'secret_key'
token = aws_assumerole.aws_assumerole_backend(**kwargs)
assert token == 'the_secret_key'

kwargs['identifier'] = 'access_key'
token = aws_assumerole.aws_assumerole_backend(**kwargs)
assert token == 'the_access_key'


class TestDelineaImports:
Expand Down

0 comments on commit 5266f82

Please sign in to comment.