From 5cc9554c232db1d1f9b6e36133d9af797746aeca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Rub=C3=A9n=20De=20la=20Torre=20Vico?= Date: Mon, 19 Aug 2024 19:03:30 +0200 Subject: [PATCH] chore(awslambda): Enhance function public access check called from other resource (#4679) Co-authored-by: Sergio Garcia <38561120+sergargar@users.noreply.github.com> --- ...tion_not_publicly_accessible.metadata.json | 2 +- ...lambda_function_not_publicly_accessible.py | 36 ++- .../services/awslambda/awslambda_service.py | 1 - ...a_function_not_publicly_accessible_test.py | 230 ++++++++++++++++++ 4 files changed, 254 insertions(+), 15 deletions(-) diff --git a/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.metadata.json b/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.metadata.json index cdc5176105c..8bff12f44d5 100644 --- a/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.metadata.json +++ b/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.metadata.json @@ -28,5 +28,5 @@ ], "DependsOn": [], "RelatedTo": [], - "Notes": "" + "Notes": "It gives a false positive if the function is exposed publicly by an other public resource like an ALB or API Gateway in an AWS Account when an AWS account ID is set as the principal of the policy." } diff --git a/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.py b/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.py index a39b8576443..20c6526b929 100644 --- a/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.py +++ b/prowler/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible.py @@ -19,20 +19,30 @@ def execute(self): if function.policy: for statement in function.policy["Statement"]: # Only check allow statements - if statement["Effect"] == "Allow": - if ( - "*" in statement["Principal"] - or ( - "AWS" in statement["Principal"] - and "*" in statement["Principal"]["AWS"] + if statement["Effect"] == "Allow" and ( + "*" in statement["Principal"] + or ( + isinstance(statement["Principal"], dict) + and ( + "*" in statement["Principal"].get("AWS", "") + or "*" + in statement["Principal"].get("CanonicalUser", "") + or ( # Check if function can be invoked by other AWS services + ( + ".amazonaws.com" + in statement["Principal"].get("Service", "") + ) + and ( + "*" in statement.get("Action", "") + or "InvokeFunction" + in statement.get("Action", "") + ) + ) ) - or ( - "CanonicalUser" in statement["Principal"] - and "*" in statement["Principal"]["CanonicalUser"] - ) - ): - public_access = True - break + ) + ): + public_access = True + break if public_access: report.status = "FAIL" diff --git a/prowler/providers/aws/services/awslambda/awslambda_service.py b/prowler/providers/aws/services/awslambda/awslambda_service.py index 82f335227eb..287bbb74a59 100644 --- a/prowler/providers/aws/services/awslambda/awslambda_service.py +++ b/prowler/providers/aws/services/awslambda/awslambda_service.py @@ -14,7 +14,6 @@ from prowler.providers.aws.lib.service.service import AWSService -################## Lambda class Lambda(AWSService): def __init__(self, provider): # Call AWSService's __init__ diff --git a/tests/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible_test.py b/tests/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible_test.py index 1b9a1460327..86c240c8c9f 100644 --- a/tests/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible_test.py +++ b/tests/providers/aws/services/awslambda/awslambda_function_not_publicly_accessible/awslambda_function_not_publicly_accessible_test.py @@ -244,3 +244,233 @@ def test_function_public_with_canonical(self): == f"Lambda function {function_name} has a policy resource-based policy with public access." ) assert result[0].resource_tags == [] + + @mock_aws + def test_function_public_with_alb(self): + # Create the mock VPC + ec2_client = client("ec2", region_name=AWS_REGION_EU_WEST_1) + vpc = ec2_client.create_vpc(CidrBlock="10.0.0.0/16") + vpc_id = vpc["Vpc"]["VpcId"] + + # Create subnets + subnet_a = ec2_client.create_subnet( + VpcId=vpc_id, + CidrBlock="10.0.1.0/24", + AvailabilityZone=f"{AWS_REGION_EU_WEST_1}a", + ) + subnet_b = ec2_client.create_subnet( + VpcId=vpc_id, + CidrBlock="10.0.2.0/24", + AvailabilityZone=f"{AWS_REGION_EU_WEST_1}b", + ) + + # Create an Internet Gateway + igw = ec2_client.create_internet_gateway() + igw_id = igw["InternetGateway"]["InternetGatewayId"] + ec2_client.attach_internet_gateway(InternetGatewayId=igw_id, VpcId=vpc_id) + + # Create a Route Table and associate it with subnets + route_table = ec2_client.create_route_table(VpcId=vpc_id) + route_table_id = route_table["RouteTable"]["RouteTableId"] + ec2_client.create_route( + RouteTableId=route_table_id, + DestinationCidrBlock="0.0.0.0/0", + GatewayId=igw_id, + ) + ec2_client.associate_route_table( + RouteTableId=route_table_id, SubnetId=subnet_a["Subnet"]["SubnetId"] + ) + ec2_client.associate_route_table( + RouteTableId=route_table_id, SubnetId=subnet_b["Subnet"]["SubnetId"] + ) + + # Create the mock IAM role + iam_client = client("iam", region_name=AWS_REGION_EU_WEST_1) + role_name = "test-role" + assume_role_policy_document = { + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": {"Service": "lambda.amazonaws.com"}, + "Action": "sts:AssumeRole", + } + ], + } + role_arn = iam_client.create_role( + RoleName=role_name, + AssumeRolePolicyDocument=dumps(assume_role_policy_document), + )["Role"]["Arn"] + + function_name = "test-public-lambda" + + # Create the lambda function using boto3 client + lambda_client = client("lambda", region_name=AWS_REGION_EU_WEST_1) + function_arn = lambda_client.create_function( + FunctionName=function_name, + Runtime="python3.8", + Role=role_arn, + Handler="index.handler", + Code={"ZipFile": b"fileb://file-path/to/your-deployment-package.zip"}, + Description="Test Lambda function", + Timeout=3, + MemorySize=128, + Publish=True, + Tags={"tag1": "value1", "tag2": "value2"}, + )["FunctionArn"] + + # Attach the policy to the lambda function with a wildcard principal + lambda_client.add_permission( + FunctionName=function_name, + StatementId="public-access", + Action="lambda:InvokeFunction", + Principal="*", + ) + + # Create a security group for ALB + sg = ec2_client.create_security_group( + GroupName="alb-sg", + Description="Security group for ALB", + VpcId=vpc_id, + ) + sg_id = sg["GroupId"] + ec2_client.authorize_security_group_ingress( + GroupId=sg_id, + IpPermissions=[ + { + "IpProtocol": "tcp", + "FromPort": 80, + "ToPort": 80, + "IpRanges": [{"CidrIp": "0.0.0.0/0"}], + } + ], + ) + + # Create the ALB + elbv2_client = client("elbv2", region_name=AWS_REGION_EU_WEST_1) + lb = elbv2_client.create_load_balancer( + Name="test-alb", + Subnets=[subnet_a["Subnet"]["SubnetId"], subnet_b["Subnet"]["SubnetId"]], + SecurityGroups=[sg_id], + Scheme="internet-facing", + Type="application", + IpAddressType="ipv4", + ) + lb_arn = lb["LoadBalancers"][0]["LoadBalancerArn"] + + # Create the Target Group for Lambda + target_group = elbv2_client.create_target_group( + Name="test-public-lambda-tg", + TargetType="lambda", + ) + target_group_arn = target_group["TargetGroups"][0]["TargetGroupArn"] + + # Add permission for ALB to invoke the Lambda function + lambda_client.add_permission( + FunctionName=function_name, + StatementId="alb-access", + Action="lambda:InvokeFunction", + Principal="elasticloadbalancing.amazonaws.com", + SourceArn=target_group_arn, + ) + + # Attach Lambda to Target Group + elbv2_client.register_targets( + TargetGroupArn=target_group_arn, + Targets=[{"Id": function_arn}], + ) + + # Create ALB Listener + elbv2_client.create_listener( + LoadBalancerArn=lb_arn, + Protocol="HTTP", + Port=80, + DefaultActions=[{"Type": "forward", "TargetGroupArn": target_group_arn}], + ) + + aws_provider = set_mocked_aws_provider([AWS_REGION_EU_WEST_1]) + + from prowler.providers.aws.services.awslambda.awslambda_service import Lambda + + with mock.patch( + "prowler.providers.common.provider.Provider.get_global_provider", + return_value=aws_provider, + ), mock.patch( + "prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client", + new=Lambda(aws_provider), + ): + # Test Check + from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import ( + awslambda_function_not_publicly_accessible, + ) + + check = awslambda_function_not_publicly_accessible() + result = check.execute() + + assert len(result) == 1 + assert result[0].region == AWS_REGION_EU_WEST_1 + assert result[0].resource_id == function_name + assert result[0].resource_arn == function_arn + assert result[0].status == "FAIL" + assert ( + result[0].status_extended + == "Lambda function test-public-lambda has a policy resource-based policy with public access." + ) + assert result[0].resource_tags == [{"tag1": "value1", "tag2": "value2"}] + + # def test_function_could_be_invoked_by_specific_aws_account(self): + # lambda_client = mock.MagicMock + # function_name = "test-lambda" + # function_runtime = "nodejs4.3" + # function_arn = f"arn:aws:lambda:{AWS_REGION_EU_WEST_1}:{AWS_ACCOUNT_NUMBER}:function/{function_name}" + # lambda_policy = { # If there is an ALB or API Gateway in specified AWS Account, the lambda function could be invoked and exposed by them + # "Version": "2012-10-17", + # "Statement": [ + # { + # "Sid": "public-access", + # "Principal": {"AWS": AWS_ACCOUNT_NUMBER}, + # "Effect": "Allow", + # "Action": [ + # "lambda:InvokeFunction", + # ], + # "Resource": [function_arn], + # } + # ], + # } + + # lambda_client.functions = { + # "function_name": Function( + # name=function_name, + # security_groups=[], + # arn=function_arn, + # region=AWS_REGION_EU_WEST_1, + # runtime=function_runtime, + # policy=lambda_policy, + # ) + # } + + # with mock.patch( + # "prowler.providers.common.provider.Provider.get_global_provider", + # return_value=set_mocked_aws_provider(), + # ), mock.patch( + # "prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible.awslambda_client", + # new=lambda_client, + # ): + # # Test Check + # from prowler.providers.aws.services.awslambda.awslambda_function_not_publicly_accessible.awslambda_function_not_publicly_accessible import ( + # awslambda_function_not_publicly_accessible, + # ) + + # check = awslambda_function_not_publicly_accessible() + # result = check.execute() + + # assert len(result) == 1 + # assert result[0].region == AWS_REGION_EU_WEST_1 + # assert result[0].resource_id == function_name + # assert result[0].resource_arn == function_arn + # assert result[0].status == "FAIL" + # assert ( + # result[0].status_extended + # == f"Lambda function {function_name} has a policy resource-based policy with public access." + # ) + # assert result[0].resource_tags == []