Skip to content

Commit

Permalink
feat(s3): add s3_bucket_lifecycle_enabled check (#4801)
Browse files Browse the repository at this point in the history
Co-authored-by: Sergio Garcia <[email protected]>
  • Loading branch information
HugoPBrito and MrCloudSec authored Aug 22, 2024
1 parent 0b23824 commit 57f1fa5
Show file tree
Hide file tree
Showing 6 changed files with 319 additions and 1 deletion.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
{
"Provider": "aws",
"CheckID": "s3_bucket_lifecycle_enabled",
"CheckTitle": "Check if S3 buckets have a Lifecycle configuration enabled",
"CheckType": [
"AWS Foundational Security Best Practices"
],
"ServiceName": "s3",
"SubServiceName": "",
"ResourceIdTemplate": "arn:partition:s3:::bucket_name",
"Severity": "low",
"ResourceType": "AwsS3Bucket",
"Description": "Check if S3 buckets have Lifecycle configuration enabled.",
"Risk": "The risks of not having lifecycle management enabled for S3 buckets include higher storage costs, unmanaged data retention, and potential non-compliance with data policies.",
"RelatedUrl": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/object-lifecycle-mgmt.html",
"Remediation": {
"Code": {
"CLI": "",
"NativeIaC": "",
"Other": "https://docs.aws.amazon.com/securityhub/latest/userguide/s3-controls.html#s3-13",
"Terraform": "https://www.trendmicro.com/cloudoneconformity/knowledge-base/aws/S3/lifecycle-configuration.html"
},
"Recommendation": {
"Text": "Enable lifecycle policies on your S3 buckets to automatically manage the transition and expiration of data.",
"Url": "https://docs.aws.amazon.com/AmazonS3/latest/userguide/how-to-set-lifecycle-configuration-intro.html"
}
},
"Categories": [],
"DependsOn": [],
"RelatedTo": [],
"Notes": ""
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from prowler.lib.check.models import Check, Check_Report_AWS
from prowler.providers.aws.services.s3.s3_client import s3_client


class s3_bucket_lifecycle_enabled(Check):
def execute(self):
findings = []
for arn, bucket in s3_client.buckets.items():
report = Check_Report_AWS(self.metadata())
report.region = bucket.region
report.resource_id = bucket.name
report.resource_arn = arn
report.resource_tags = bucket.tags
report.status = "FAIL"
report.status_extended = f"S3 Bucket {bucket.name} does not have a lifecycle configuration enabled."

if bucket.lifecycle:
for configuration in bucket.lifecycle:
if configuration.status == "Enabled":
report.status = "PASS"
report.status_extended = f"S3 Bucket {bucket.name} has a lifecycle configuration enabled."
break

findings.append(report)

return findings
29 changes: 29 additions & 0 deletions prowler/providers/aws/services/s3/s3_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, provider):
)
self.__threading_call__(self._get_bucket_tagging, self.buckets.values())
self.__threading_call__(self._get_bucket_replication, self.buckets.values())
self.__threading_call__(self._get_bucket_lifecycle, self.buckets.values())

def _list_buckets(self, provider):
logger.info("S3 - Listing buckets...")
Expand Down Expand Up @@ -379,6 +380,28 @@ def _get_bucket_tagging(self, bucket):
f"{error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _get_bucket_lifecycle(self, bucket):
logger.info("S3 - Get buckets lifecycle...")
try:
regional_client = self.regional_clients[bucket.region]
lifecycle_configuration = (
regional_client.get_bucket_lifecycle_configuration(Bucket=bucket.name)
)
for rule in lifecycle_configuration["Rules"]:
bucket.lifecycle.append(
LifeCycleRule(
id=rule["ID"],
status=rule["Status"],
)
)
except ClientError as error:
if error.response["Error"]["Code"] == "NoSuchLifecycleConfiguration":
bucket.lifecycle = []
elif error.response["Error"]["Code"] == "NoSuchBucket":
logger.warning(
f"{regional_client.region} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)

def _get_bucket_replication(self, bucket):
logger.info("S3 - Get buckets replication...")
try:
Expand Down Expand Up @@ -530,6 +553,11 @@ class AccessPoint(BaseModel):
region: str


class LifeCycleRule(BaseModel):
id: str
status: str


class ReplicationRule(BaseModel):
id: str
status: str
Expand All @@ -550,4 +578,5 @@ class Bucket(BaseModel):
object_lock: bool = False
mfa_delete: bool = False
tags: Optional[list] = []
lifecycle: Optional[list[LifeCycleRule]] = []
replication_rules: Optional[list[ReplicationRule]] = []
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
from unittest import mock
from unittest.mock import patch

from moto import mock_aws

from prowler.providers.aws.services.s3.s3_service import S3, Bucket, LifeCycleRule
from tests.providers.aws.utils import (
AWS_ACCOUNT_NUMBER,
AWS_REGION_US_EAST_1,
set_mocked_aws_provider,
)


class Test_s3_bucket_lifecycle_enabled:
@mock_aws
def test_no_buckets(self):
from prowler.providers.aws.services.s3.s3_service import S3

aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
):
with mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_no_mfa_delete.s3_bucket_no_mfa_delete import (
s3_bucket_no_mfa_delete,
)

check = s3_bucket_no_mfa_delete()
result = check.execute()

assert len(result) == 0

def test_no_lifecycle_configuration(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled import (
s3_bucket_lifecycle_enabled,
)

bucket_name = "bucket-test"
bucket_arn = f"arn:aws:s3::{AWS_ACCOUNT_NUMBER}:{bucket_name}"
s3_client = mock.MagicMock()
s3_client.buckets = {
bucket_arn: Bucket(
name=bucket_name,
region=AWS_REGION_US_EAST_1,
)
}

with patch(
"prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled.s3_client",
s3_client,
):
check = s3_bucket_lifecycle_enabled()
result = check.execute()

# ALL REGIONS
assert len(result) == 1

# AWS_REGION_US_EAST_1
assert result[0].status == "FAIL"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name} does not have a lifecycle configuration enabled."
)
assert result[0].resource_id == bucket_name
assert result[0].resource_arn == bucket_arn
assert result[0].region == AWS_REGION_US_EAST_1

def test_one_valid_lifecycle_configuration(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled import (
s3_bucket_lifecycle_enabled,
)

s3_client = mock.MagicMock()
bucket_name = "bucket-test"
bucket_arn = f"arn:aws:s3::{AWS_ACCOUNT_NUMBER}:{bucket_name}"
s3_client.buckets = {
bucket_arn: Bucket(
name=bucket_name,
region=AWS_REGION_US_EAST_1,
lifecycle=[
LifeCycleRule(
id="test-rule-1",
status="Enabled",
),
],
)
}

with patch(
"prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled.s3_client",
s3_client,
):
check = s3_bucket_lifecycle_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name} has a lifecycle configuration enabled."
)
assert result[0].resource_id == bucket_name
assert result[0].resource_arn == bucket_arn
assert result[0].region == AWS_REGION_US_EAST_1

def test_several_lifecycle_configurations(self):
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])

with mock.patch(
"prowler.providers.common.provider.Provider.get_global_provider",
return_value=aws_provider,
), mock.patch(
"prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled.s3_client",
new=S3(aws_provider),
):
# Test Check
from prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled import (
s3_bucket_lifecycle_enabled,
)

s3_client = mock.MagicMock()
bucket_name = "bucket-test"
bucket_arn = f"arn:aws:s3::{AWS_ACCOUNT_NUMBER}:{bucket_name}"
s3_client.buckets = {
bucket_arn: Bucket(
name=bucket_name,
region=AWS_REGION_US_EAST_1,
lifecycle=[
LifeCycleRule(
id="test-rule-1",
status="Disabled",
),
LifeCycleRule(
id="test-rule-2",
status="Enabled",
),
],
)
}

with patch(
"prowler.providers.aws.services.s3.s3_bucket_lifecycle_enabled.s3_bucket_lifecycle_enabled.s3_client",
s3_client,
):
check = s3_bucket_lifecycle_enabled()
result = check.execute()

assert len(result) == 1
assert result[0].status == "PASS"
assert (
result[0].status_extended
== f"S3 Bucket {bucket_name} has a lifecycle configuration enabled."
)
assert result[0].resource_id == bucket_name
assert result[0].resource_arn == bucket_arn
assert result[0].region == AWS_REGION_US_EAST_1
51 changes: 50 additions & 1 deletion tests/providers/aws/services/s3/s3_service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,16 @@ def mock_make_api_call(self, operation_name, kwarg):
}
]
}

if operation_name == "GetBucketLifecycleConfiguration":
return {
"Rules": [
{
"ID": "test",
"Status": "Enabled",
"Prefix": "test",
}
]
}
return orig(self, operation_name, kwarg)


Expand Down Expand Up @@ -429,6 +438,46 @@ def test_get_bucket_replication(self):
assert s3.buckets[bucket_arn].replication_rules[0].status == "Enabled"
assert s3.buckets[bucket_arn].replication_rules[0].destination == bucket_arn

# Test S3 Get Bucket Lifecycle
@mock_aws
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
def test_get_bucket_lifecycle(self):
# Generate S3 Client
s3_client = client("s3")

# Create S3 Bucket
bucket_name = "test-bucket"
bucket_arn = f"arn:aws:s3:::{bucket_name}"
s3_client.create_bucket(
Bucket=bucket_name,
ObjectOwnership="BucketOwnerEnforced",
ObjectLockEnabledForBucket=True,
)

# DEPRECATED: Put Bucket LifeCycle
s3_client.put_bucket_lifecycle(
Bucket=bucket_name,
LifecycleConfiguration={
"Rules": [
{
"ID": "test",
"Status": "Enabled",
"Prefix": "test",
}
]
},
)

# S3 client for this test class
aws_provider = set_mocked_aws_provider([AWS_REGION_US_EAST_1])
s3 = S3(aws_provider)
assert len(s3.buckets) == 1
assert s3.buckets[bucket_arn].name == bucket_name
assert s3.buckets[bucket_arn].region == AWS_REGION_US_EAST_1
assert len(s3.buckets[bucket_arn].lifecycle) == 1
assert s3.buckets[bucket_arn].lifecycle[0].id == "test"
assert s3.buckets[bucket_arn].lifecycle[0].status == "Enabled"

# Test S3 List Access Points
@patch("botocore.client.BaseClient._make_api_call", new=mock_make_api_call)
@mock_aws
Expand Down

0 comments on commit 57f1fa5

Please sign in to comment.