Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(terraform): Add new checks to match run checks #6868

Merged
merged 10 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
metadata:
id: "CKV2_AZURE_55"
name: "Ensure Azure Spring Cloud app end-to-end TLS is enabled"
category: "NETWORKING"
definition:
or:
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_service
attribute: sku_tier
operator: not_exists
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_service
attribute: sku_tier
operator: equals
value: "Basic"
- and:
- cond_type: filter
attribute: resource_type
value:
- azurerm_spring_cloud_service
operator: within
- or:
- resource_types:
- azurerm_spring_cloud_service
connected_resource_types:
- azurerm_spring_cloud_app
operator: not_exists
cond_type: connection
- and:
- resource_types:
- azurerm_spring_cloud_service
connected_resource_types:
- azurerm_spring_cloud_app
operator: exists
cond_type: connection
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_app
attribute: tls_enabled
operator: exists
- cond_type: attribute
resource_types:
- azurerm_spring_cloud_app
attribute: tls_enabled
operator: is_true
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
from __future__ import annotations

from typing import Any

from checkov.common.models.enums import CheckResult, CheckCategories
from checkov.terraform.checks.resource.base_resource_check import BaseResourceCheck
from checkov.common.util.type_forcers import force_list
from checkov.common.util.type_forcers import force_int


class AbsSecurityGroupUnrestrictedEgress(BaseResourceCheck):
def __init__(self, check_id: str, port: int) -> None:
name = f"Ensure no security groups allow egress from 0.0.0.0:0 to port {port}"
supported_resources = ('aws_security_group', 'aws_security_group_rule', 'aws_vpc_security_group_egress_rule')
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=check_id, categories=categories, supported_resources=supported_resources)
self.port = port

def scan_resource_conf(self, conf: dict[str, list[Any]]) -> CheckResult:
"""
Looks for configuration at security group egress rules :
https://www.terraform.io/docs/providers/aws/r/security_group.html
https://www.terraform.io/docs/providers/aws/r/security_group_rule.html

Return PASS if:
- The resource is an aws_security_group that contains no violating egress rules (including if there are no
egress rules at all), OR
- The resource is an aws_security_group_rule of type 'egress' that does not violate the check.

Return FAIL if:
- The resource is an aws_security_group that contains a violating egress rule, OR
- The resource is an aws_security_group_rule of type 'egress' that violates the check.

Return UNKNOWN if:
- the resource is an aws_security_group_rule of type 'egress', OR

:param conf: aws_security_group configuration
:return: <CheckResult>
"""

if 'egress' in conf: # This means it's an SG resource with egress block(s)
egress_conf = conf['egress']
for egress_rule in egress_conf:
for rule in force_list(egress_rule):
if isinstance(rule, dict):
if self.check_self(rule):
return CheckResult.PASSED
if self.contains_violation(rule):
self.evaluated_keys = [
f'egress/[{egress_conf.index(egress_rule)}]/from_port',
f'egress/[{egress_conf.index(egress_rule)}]/to_port',
f'egress/[{egress_conf.index(egress_rule)}]/cidr_blocks',
f'egress/[{egress_conf.index(egress_rule)}]/ipv6_cidr_blocks',
]
return CheckResult.FAILED

return CheckResult.PASSED

if 'type' in conf: # This means it's an SG_rule resource.
type = force_list(conf['type'])[0]
if type == 'egress':
if self.check_self(conf):
return CheckResult.PASSED
self.evaluated_keys = ['from_port', 'to_port', 'cidr_blocks', 'ipv6_cidr_blocks']
if self.contains_violation(conf):
return CheckResult.FAILED
return CheckResult.PASSED
return CheckResult.UNKNOWN
else:
self.evaluated_keys = ['from_port', 'to_port', 'cidr_ipv4', 'cidr_ipv6']
if 'from_port' in conf or 'to_port' in conf:
if self.contains_violation(conf):
return CheckResult.FAILED
return CheckResult.PASSED

return CheckResult.PASSED

def contains_violation(self, conf: dict[str, list[Any]]) -> bool:
from_port = force_int(force_list(conf.get('from_port', [{-1}]))[0])
to_port = force_int(force_list(conf.get('to_port', [{-1}]))[0])
protocol = force_list(conf.get('protocol', [None]))[0]
if from_port == 0 and to_port == 0:
to_port = 65535

prefix_list_ids = conf.get('prefix_list_ids')
if prefix_list_ids and prefix_list_ids != [[]]:
return False

if from_port is not None and to_port is not None and (from_port <= self.port <= to_port) or (
protocol == '-1' and from_port == 0 and to_port == 65535):
if conf.get('cidr_blocks'):
conf_cidr_blocks = conf.get('cidr_blocks', [[]])
else:
conf_cidr_blocks = conf.get('cidr_ipv4', [[]])
if conf_cidr_blocks and len(conf_cidr_blocks) > 0:
conf_cidr_blocks = conf_cidr_blocks[0]
cidr_blocks = force_list(conf_cidr_blocks)
if "0.0.0.0/0" in cidr_blocks:
return True
if conf.get('ipv6_cidr_blocks'):
ipv6_cidr_blocks = conf.get('ipv6_cidr_blocks', [])
else:
ipv6_cidr_blocks = conf.get('cidr_ipv6', [])
if ipv6_cidr_blocks and ipv6_cidr_blocks[0] is not None and \
any(ip in ['::/0', '0000:0000:0000:0000:0000:0000:0000:0000/0'] for ip in ipv6_cidr_blocks[0]):
return True
if not ipv6_cidr_blocks and not cidr_blocks \
and conf.get('security_groups') is None \
and conf.get('source_security_group_id') is None:
return True
return False

def check_self(self, conf: dict[str, list[Any]]) -> bool:
if conf.get('self'):
limit = force_list(conf['self'])[0]
if limit:
return True
return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from checkov.terraform.checks.resource.aws.AbsSecurityGroupUnrestrictedEgress import\
AbsSecurityGroupUnrestrictedEgress


class SecurityGroupUnrestrictedEgressAll(AbsSecurityGroupUnrestrictedEgress):
def __init__(self):
super().__init__(check_id="CKV_AWS_382", port=-1)


check = SecurityGroupUnrestrictedEgressAll()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List

from checkov.common.models.enums import CheckCategories
from checkov.terraform.checks.resource.base_resource_value_check import BaseResourceValueCheck


class AzureContainerInstancePublicIPAddressType(BaseResourceValueCheck):
def __init__(self) -> None:
name = "Ensure that Azure Container group is deployed into virtual network"
id = "CKV_AZURE_245"
supported_resources = ('azurerm_container_group',)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return 'ip_address_type'

def get_expected_values(self) -> List[str]:
return ['Private', 'None']


check = AzureContainerInstancePublicIPAddressType()
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import List, Any

from checkov.terraform.checks.resource.base_resource_negative_value_check import BaseResourceNegativeValueCheck
from checkov.common.models.enums import CheckCategories


class KubernetesClusterHTTPApplicationRouting(BaseResourceNegativeValueCheck):
def __init__(self) -> None:
name = "Ensure Azure AKS cluster HTTP application routing is disabled"
id = "CKV_AZURE_246"
supported_resources = ('azurerm_kubernetes_cluster',)
categories = (CheckCategories.NETWORKING,)
super().__init__(name=name, id=id, categories=categories, supported_resources=supported_resources)

def get_inspected_key(self) -> str:
return "http_application_routing_enabled"

def get_forbidden_values(self) -> List[Any]:
return [True]


check = KubernetesClusterHTTPApplicationRouting()
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
resource "aws_security_group" "pass" {
name = "example"
vpc_id = "aws_vpc.example.id"

ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 80
to_port = 80
protocol = "tcp"
}
ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 443
to_port = 443
protocol = "tcp"
}
egress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 20
to_port = 200
protocol = "-1"
}
}

resource "aws_security_group" "fail2" {
name = "example"
vpc_id = "aws_vpc.example.id"

ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 80
to_port = 80
protocol = "tcp"
}
ingress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 443
to_port = 443
protocol = "tcp"
}
egress {
cidr_blocks = ["0.0.0.0/0"]
from_port = 0
to_port = 0
protocol = "-1"
}
}

resource "aws_security_group_rule" "pass" {
cidr_blocks = ["0.0.0.0/0"]
from_port = 80
to_port = 80
protocol = "tcp"
security_group_id = "sg-12345"
type = "egress"
}

resource "aws_vpc_security_group_egress_rule" "pass" {
security_group_id = aws_security_group.example.id

cidr_ipv4 = "0.0.0.0/0"
from_port = 80
ip_protocol = "tcp"
to_port = 80
}

# fail
resource "aws_security_group" "fail" {
name = "allow-all-ingress"
description = "unfettered access"
vpc_id = "test_vpc"

egress {
from_port = -1
to_port = -1
protocol = "tcp"
cidr_blocks = ["0.0.0.0/0"]
description = "Test unfettered access"
}
}


resource "aws_security_group_rule" "fail" {
cidr_blocks = ["0.0.0.0/0"]
from_port = -1
to_port = -1
protocol = "tcp"
security_group_id = "sg-12345"
description = "Test unfettered access"
type = "egress"
}

resource "aws_security_group_rule" "fail2" {
cidr_blocks = ["0.0.0.0/0"]
from_port = 0
to_port = 0
protocol = "-1"
security_group_id = "sg-123456"
description = "Test unfettered access"
type = "egress"
}

resource "aws_vpc_security_group_egress_rule" "fail" {
security_group_id = aws_security_group.example.id

cidr_ipv4 = "0.0.0.0/0"
from_port = -1
ip_protocol = "tcp"
to_port = -1
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import unittest
from pathlib import Path

from checkov.runner_filter import RunnerFilter
from checkov.terraform.checks.resource.aws.SecurityGroupUnrestrictedEgressAny import check
from checkov.terraform.runner import Runner


class TestSecurityGroupUnrestrictedEgressAny(unittest.TestCase):
def test(self):
# given
test_files_dir = Path(__file__).parent / "example_SecurityGroupUnrestrictedEgressAny"

# when
report = Runner().run(root_folder=str(test_files_dir), runner_filter=RunnerFilter(checks=[check.id]))

# then
summary = report.get_summary()

passing_resources = {
"aws_security_group.pass",
"aws_security_group_rule.pass",
"aws_vpc_security_group_egress_rule.pass"
}

failing_resources = {
"aws_security_group.fail2",
"aws_security_group.fail",
"aws_security_group_rule.fail",
"aws_vpc_security_group_egress_rule.fail",
"aws_security_group_rule.fail2"
}

passed_check_resources = {c.resource for c in report.passed_checks}
failed_check_resources = {c.resource for c in report.failed_checks}

self.assertEqual(summary["passed"], len(passing_resources))
self.assertEqual(summary["failed"], len(failing_resources))
self.assertEqual(summary["skipped"], 0)
self.assertEqual(summary["parsing_errors"], 0)

self.assertEqual(passing_resources, passed_check_resources)
self.assertEqual(failing_resources, failed_check_resources)


if __name__ == "__main__":
unittest.main()
Loading
Loading