Skip to content

Commit

Permalink
Merge branch 'contrib/zeekforit_master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
zeekforit authored Jan 9, 2024
2 parents 225f8b5 + 7b8cfa8 commit edd8e91
Show file tree
Hide file tree
Showing 13 changed files with 1,704 additions and 1,635 deletions.
2,022 changes: 927 additions & 1,095 deletions Packs/AWS-EC2/Integrations/AWS-EC2/AWS-EC2.py

Large diffs are not rendered by default.

328 changes: 232 additions & 96 deletions Packs/AWS-EC2/Integrations/AWS-EC2/AWS-EC2.yml

Large diffs are not rendered by default.

166 changes: 114 additions & 52 deletions Packs/AWS-EC2/Integrations/AWS-EC2/AWS-EC2_test.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from AWSApiModule import *
from CommonServerPython import * # noqa: F401
import demistomock as demisto # noqa: F401
import importlib
import pytest
AWS_EC2 = importlib.import_module("AWS-EC2")


AWS_EC2 = importlib.import_module("AWS-EC2")

VALID_ARGS = {"groupId": "sg-0566450bb5ae17c7d",
"IpPermissionsfromPort": 23,
"IpPermissionsToPort": 23,
Expand All @@ -17,6 +19,10 @@
"IpPermissionsFull": """[{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [], "PrefixListIds": [], "UserIdGroupPairs": []}]"""}

IPPERMISSIONSFULL_ARGS = {"groupId": "sg-0566450bb5ae17c7d",
"IpPermissionsFull": """[{"IpProtocol": "-1", "IpRanges": [{"CidrIp": "0.0.0.0/0"}],
"Ipv6Ranges": [], "PrefixListIds": [], "UserIdGroupPairs": []}]"""}

INVALID_ARGS = {"groupId": "sg-0566450bb5ae17c7d",
"region": "reg",
"roleArn": "role",
Expand All @@ -41,23 +47,7 @@ def get_ipam_discovered_public_addresses(self, **kwargs):
pass


def create_client():
aws_client_args = {
'aws_default_region': 'us-east-1',
'aws_role_arn': None,
'aws_role_session_name': None,
'aws_role_session_duration': None,
'aws_role_policy': None,
'aws_access_key_id': 'test_access_key',
'aws_secret_access_key': 'test_secret_key',
'aws_session_token': 'test_sts_token',
'verify_certificate': False,
'timeout': 60,
'retries': 3
}

client = AWSClient(**aws_client_args)
return client
AWS_EC2.build_client = lambda _: Boto3Client


def validate_kwargs(*args, **kwargs):
Expand All @@ -72,11 +62,7 @@ def validate_kwargs(*args, **kwargs):
return {'ResponseMetadata': {'HTTPStatusCode': 404}, 'Return': "some_return_value"}


@pytest.mark.parametrize('args, expected_results', [
(IPPERMISSIONSFULL_ARGS, "The Security Group ingress rule was created"),
(INVALID_ARGS, None)
])
def test_aws_ec2_authorize_security_group_ingress_rule(mocker, args, expected_results):
def test_aws_ec2_authorize_security_group_ingress_rule(mocker):
"""
Given
- authorize-security-group-ingress-command arguments and aws client
Expand All @@ -89,16 +75,16 @@ def test_aws_ec2_authorize_security_group_ingress_rule(mocker, args, expected_re
- Case 1: Should ensure that the right message was resulted return true.
- Case 2: Should ensure that no message was resulted return true.
"""
aws_client = create_client()
mocker.patch.object(AWSClient, "aws_session", return_value=Boto3Client())
mocker.patch.object(Boto3Client, 'authorize_security_group_ingress', side_effect=validate_kwargs)
mocker.patch.object(demisto, 'results')
AWS_EC2.authorize_security_group_ingress_command(args, aws_client)
if not expected_results:
assert not demisto.results.call_args
else:
results = demisto.results.call_args[0][0]
assert results == expected_results
mocker.patch.object(AWS_EC2, 'return_results')

# Case 1
with pytest.raises(DemistoException, match='Unexpected response from AWS - EC2:'):
AWS_EC2.authorize_security_group_ingress_command(INVALID_ARGS)

# Case 2
result = AWS_EC2.authorize_security_group_ingress_command(IPPERMISSIONSFULL_ARGS)
assert result.readable_output == "The Security Group ingress rule was created"


def test_create_policy_kwargs_dict():
Expand All @@ -116,11 +102,7 @@ def test_create_policy_kwargs_dict():
assert AWS_EC2.create_policy_kwargs_dict({}) == {}


@pytest.mark.parametrize('args, expected_results', [
(VALID_ARGS, "The Security Group egress rule was created"),
(INVALID_ARGS, None)
])
def test_aws_ec2_authorize_security_group_egress_rule(mocker, args, expected_results):
def test_aws_ec2_authorize_security_group_egress_rule(mocker):
"""
Given
- authorize-security-group-egress-command arguments and aws client
Expand All @@ -133,16 +115,16 @@ def test_aws_ec2_authorize_security_group_egress_rule(mocker, args, expected_res
- Case 1: Should ensure that the right message was resulted return true.
- Case 2: Should ensure that no message was resulted return true.
"""
aws_client = create_client()
mocker.patch.object(AWSClient, "aws_session", return_value=Boto3Client())
mocker.patch.object(Boto3Client, 'authorize_security_group_egress', side_effect=validate_kwargs)
mocker.patch.object(demisto, 'results')
AWS_EC2.authorize_security_group_egress_command(args, aws_client)
if not expected_results:
assert not demisto.results.call_args
else:
results = demisto.results.call_args[0][0]
assert results == expected_results
mocker.patch.object(AWS_EC2, 'return_results')

# Case 1
with pytest.raises(DemistoException, match='Unexpected response from AWS - EC2:'):
AWS_EC2.authorize_security_group_egress_command(INVALID_ARGS)

# Case 2
result = AWS_EC2.authorize_security_group_egress_command(VALID_ARGS)
assert result.readable_output == "The Security Group egress rule was created"


@pytest.mark.parametrize('filter, expected_results', [
Expand All @@ -164,6 +146,88 @@ def test_parse_filter_field(filter, expected_results):
assert res == expected_results


def mock_command_func(_):
return CommandResults(
outputs=[{}],
readable_output='readable_output',
outputs_prefix='prefix',
)


def test_run_on_all_accounts(mocker):
"""
Given:
- The accounts_to_access and access_role_name params are provided.
When:
- Calling a command function that is decorated with test_account_runner.
Then:
- Ensure account_runner runs the command function for each of the accounts provided.
"""
AWS_EC2.ROLE_NAME = 'name'
AWS_EC2.PARAMS = {'accounts_to_access': '1,2'}
mocker.patch.object(demisto, 'getArg', return_value=None)

# list as output
result_func = AWS_EC2.run_on_all_accounts(mock_command_func)
results: list[CommandResults] = result_func({})

assert results[0].readable_output == '#### Result for account `1`:\nreadable_output'
assert results[0].outputs == [{'AccountId': '1'}]
assert results[1].readable_output == '#### Result for account `2`:\nreadable_output'
assert results[1].outputs == [{'AccountId': '2'}]

# dict as output
result_func = AWS_EC2.run_on_all_accounts(lambda _: CommandResults(
outputs={},
readable_output='readable_output',
outputs_prefix='prefix',
))
results: list[CommandResults] = result_func({})

assert results[0].readable_output == '#### Result for account `1`:\nreadable_output'
assert results[0].outputs == {'AccountId': '1'}
assert results[1].readable_output == '#### Result for account `2`:\nreadable_output'
assert results[1].outputs == {'AccountId': '2'}


@pytest.mark.parametrize('role_name, roleArn', [
(None, None), ('name', 'role'),
])
def test_run_on_all_accounts_no_new_func(mocker, role_name, roleArn):
"""
Given:
- 1. The access_role_name param is not provided.
- 2. The roleArn arg is provided.
When:
- Calling a command function that is decorated with test_account_runner.
Then:
- Ensure account_runner returns the command function unchanged.
"""
# case 1
AWS_EC2.ROLE_NAME = role_name
AWS_EC2.IS_ARN_PROVIDED = True

result_func = AWS_EC2.run_on_all_accounts(mock_command_func)
result: CommandResults = result_func({})

assert result.readable_output == 'readable_output'
assert result.outputs == [{}]

# case 2
AWS_EC2.ROLE_NAME = None
AWS_EC2.IS_ARN_PROVIDED = False

result_func = AWS_EC2.run_on_all_accounts(mock_command_func)
result: CommandResults = result_func({})

assert result.readable_output == 'readable_output'
assert result.outputs == [{}]


@pytest.mark.parametrize('return_boto, expected_results', [
({'IpamResourceDiscoveries': []}, 'No Ipam Resource Discoveries were found.'),
({"IpamResourceDiscoveries":
Expand Down Expand Up @@ -201,7 +265,7 @@ def test_describe_ipam_resource_discoveries_command(mocker, return_boto, expecte
- Case 2: Should ensure that information on resource was returned.
"""
mocker.patch.object(Boto3Client, 'describe_ipam_resource_discoveries', return_value=return_boto)
results = AWS_EC2.describe_ipam_resource_discoveries_command({}, Boto3Client)
results = AWS_EC2.describe_ipam_resource_discoveries_command({})
assert results.readable_output == expected_results


Expand Down Expand Up @@ -243,7 +307,7 @@ def test_describe_ipam_resource_discovery_associations_command(mocker, return_bo
- Case 2: Should ensure that information on resource was returned.
"""
mocker.patch.object(Boto3Client, 'describe_ipam_resource_discovery_associations', return_value=return_boto)
results = AWS_EC2.describe_ipam_resource_discovery_associations_command({}, Boto3Client)
results = AWS_EC2.describe_ipam_resource_discovery_associations_command({})
assert results.readable_output == expected_results


Expand Down Expand Up @@ -297,11 +361,9 @@ def test_get_ipam_discovered_public_addresses_command(mocker, return_boto, expec
- Case 1: Should ensure that generic "nothing found" message returned.
- Case 2: Should ensure that information on resource was returned.
"""
aws_client = create_client()
mocker.patch.object(AWSClient, "aws_session", return_value=Boto3Client())
mocker.patch.object(Boto3Client, 'get_ipam_discovered_public_addresses', return_value=return_boto)
args = {"IpamResourceDiscoveryId": "ipam-res-disco-11111111111111111",
"AddressRegion": "us-east-1",
"Filters": "Name=address,Values=1.1.1.1"}
results = AWS_EC2.get_ipam_discovered_public_addresses_command(args, aws_client)
results = AWS_EC2.get_ipam_discovered_public_addresses_command(args)
assert results.readable_output == expected_results
Loading

0 comments on commit edd8e91

Please sign in to comment.