From a126fd82b3fb9f360145f22505a5cbf192c60045 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20De=20la=20Torre=20Vico?= Date: Mon, 19 Aug 2024 17:34:39 +0200 Subject: [PATCH] fix(ec2): Manage `UnicodeDecodeError` when reading user data (#4785) --- .../ec2_instance_secrets_user_data.py | 24 ++++++--- .../ec2_launch_template_no_secrets.py | 22 ++++++-- .../ec2_instance_secrets_user_data_test.py | 30 +++++++++++ .../ec2_launch_template_no_secrets_test.py | 52 +++++++++++++++++++ 4 files changed, 117 insertions(+), 11 deletions(-) diff --git a/prowler/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data.py b/prowler/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data.py index 5911c97f8f2..d8e77802074 100644 --- a/prowler/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data.py +++ b/prowler/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data.py @@ -8,6 +8,7 @@ from prowler.config.config import encoding_format_utf_8 from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.lib.logger import logger from prowler.providers.aws.services.ec2.ec2_client import ec2_client @@ -24,12 +25,23 @@ def execute(self): if instance.user_data: temp_user_data_file = tempfile.NamedTemporaryFile(delete=False) user_data = b64decode(instance.user_data) - if user_data[0:2] == b"\x1f\x8b": # GZIP magic number - user_data = zlib.decompress( - user_data, zlib.MAX_WBITS | 32 - ).decode(encoding_format_utf_8) - else: - user_data = user_data.decode(encoding_format_utf_8) + try: + if user_data[0:2] == b"\x1f\x8b": # GZIP magic number + user_data = zlib.decompress( + user_data, zlib.MAX_WBITS | 32 + ).decode(encoding_format_utf_8) + else: + user_data = user_data.decode(encoding_format_utf_8) + except UnicodeDecodeError as error: + logger.warning( + f"{instance.region} -- Unable to decode user data in EC2 instance {instance.id}: {error}" + ) + continue + except Exception as error: + logger.error( + f"{instance.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + continue temp_user_data_file.write( bytes(user_data, encoding="raw_unicode_escape") diff --git a/prowler/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets.py b/prowler/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets.py index 1910b35c7f0..42958d2808c 100644 --- a/prowler/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets.py +++ b/prowler/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets.py @@ -8,6 +8,7 @@ from prowler.config.config import encoding_format_utf_8 from prowler.lib.check.models import Check, Check_Report_AWS +from prowler.lib.logger import logger from prowler.providers.aws.services.ec2.ec2_client import ec2_client @@ -29,12 +30,23 @@ def execute(self): temp_user_data_file = tempfile.NamedTemporaryFile(delete=False) user_data = b64decode(version.template_data["UserData"]) - if user_data[0:2] == b"\x1f\x8b": # GZIP magic number - user_data = zlib.decompress(user_data, zlib.MAX_WBITS | 32).decode( - encoding_format_utf_8 + try: + if user_data[0:2] == b"\x1f\x8b": # GZIP magic number + user_data = zlib.decompress( + user_data, zlib.MAX_WBITS | 32 + ).decode(encoding_format_utf_8) + else: + user_data = user_data.decode(encoding_format_utf_8) + except UnicodeDecodeError as error: + logger.warning( + f"{template.region} -- Unable to decode User Data in EC2 Launch Template {template.name} version {version.version_number}: {error}" ) - else: - user_data = user_data.decode(encoding_format_utf_8) + continue + except Exception as error: + logger.error( + f"{template.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}" + ) + continue temp_user_data_file.write( bytes(user_data, encoding="raw_unicode_escape") diff --git a/tests/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data_test.py b/tests/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data_test.py index 11d12accde7..373be31cba3 100644 --- a/tests/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data_test.py +++ b/tests/providers/aws/services/ec2/ec2_instance_secrets_user_data/ec2_instance_secrets_user_data_test.py @@ -265,3 +265,33 @@ def test_one_ec2_file_with_secrets_gzip(self): ) assert result[0].resource_tags is None assert result[0].region == AWS_REGION_US_EAST_1 + + @mock_aws + def test_one_secrets_with_unicode_error(self): + invalid_utf8_bytes = b"\xc0\xaf" + ec2 = resource("ec2", region_name=AWS_REGION_US_EAST_1) + ec2.create_instances( + ImageId=EXAMPLE_AMI_ID, MinCount=1, MaxCount=1, UserData=invalid_utf8_bytes + ) + + from prowler.providers.aws.services.ec2.ec2_service import EC2 + + aws_provider = set_mocked_aws_provider( + [AWS_REGION_EU_WEST_1, AWS_REGION_US_EAST_1] + ) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.ec2.ec2_instance_secrets_user_data.ec2_instance_secrets_user_data.ec2_client", + new=EC2(aws_provider), + ): + from prowler.providers.aws.services.ec2.ec2_instance_secrets_user_data.ec2_instance_secrets_user_data import ( + ec2_instance_secrets_user_data, + ) + + check = ec2_instance_secrets_user_data() + result = check.execute() + + assert len(result) == 0 diff --git a/tests/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets_test.py b/tests/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets_test.py index fcfb1146f4d..7e889fbc1ce 100644 --- a/tests/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets_test.py +++ b/tests/providers/aws/services/ec2/ec2_launch_template_no_secrets/ec2_launch_template_no_secrets_test.py @@ -444,3 +444,55 @@ def test_two_launch_templates_one_template_with_secrets(self): assert result[0].region == AWS_REGION_US_EAST_1 assert result[1].status == "PASS" + + @mock_aws + def test_one_launch_template_with_unicode_error(self): + launch_template_name = "tester" + invalid_utf8_bytes = b"\xc0\xaf" + + ec2_client = client("ec2", region_name=AWS_REGION_US_EAST_1) + ec2_client.create_launch_template( + LaunchTemplateName=launch_template_name, + VersionDescription="Launch Template with secrets", + LaunchTemplateData={ + "InstanceType": "t1.micro", + "UserData": b64encode(invalid_utf8_bytes).decode(encoding_format_utf_8), + }, + ) + + launch_template_id = ec2_client.describe_launch_templates()["LaunchTemplates"][ + 0 + ]["LaunchTemplateId"] + + from prowler.providers.aws.services.ec2.ec2_service import EC2 + + aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1]) + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.ec2.ec2_launch_template_no_secrets.ec2_launch_template_no_secrets.ec2_client", + new=EC2(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.ec2.ec2_launch_template_no_secrets.ec2_launch_template_no_secrets import ( + ec2_launch_template_no_secrets, + ) + + check = ec2_launch_template_no_secrets() + result = check.execute() + + assert len(result) == 1 + assert result[0].status == "PASS" + assert ( + result[0].status_extended + == f"No secrets found in User Data of any version for EC2 Launch Template {launch_template_name}." + ) + assert result[0].resource_id == launch_template_id + assert result[0].region == AWS_REGION_US_EAST_1 + assert ( + result[0].resource_arn + == f"arn:aws:ec2:us-east-1:123456789012:launch-template/{launch_template_id}" + ) + assert result[0].resource_tags == []