diff --git a/controlpanel/api/aws.py b/controlpanel/api/aws.py index edae8ea8d..5d3a6285e 100644 --- a/controlpanel/api/aws.py +++ b/controlpanel/api/aws.py @@ -269,98 +269,3 @@ def list_role_names(self, prefix="/"): aws = AWSClient() - - -class S3AccessPolicy(object): - def __init__(self, document=None): - self._readonly_arns = set() - self._readwrite_arns = set() - - if document: - self._load(document) - - def revoke_access(self, bucket_arn): - self._readonly_arns.discard(bucket_arn) - self._readwrite_arns.discard(bucket_arn) - - def grant_access(self, bucket_arn, readwrite=False): - self.revoke_access(bucket_arn) - - if readwrite: - self._readwrite_arns.add(bucket_arn) - else: - self._readonly_arns.add(bucket_arn) - - @property - def document(self): - statements = [ - { - "Sid": "console", - "Effect": "Allow", - "Action": ["s3:GetBucketLocation", "s3:ListAllMyBuckets"], - "Resource": "arn:aws:s3:::*", - } - ] - - all_buckets_arns = self._readonly_arns | self._readwrite_arns - if all_buckets_arns: - statements.append(self._list_statement(all_buckets_arns)) - - if self._readonly_arns: - statements.append(self._readonly_statement) - - if self._readwrite_arns: - statements.append(self._readwrite_statement) - - return {"Version": "2012-10-17", "Statement": statements} - - def _list_statement(self, all_buckets_arns): - return { - "Sid": "list", - "Action": ["s3:ListBucket"], - "Effect": "Allow", - "Resource": list(all_buckets_arns), - } - - @property - def _readonly_statement(self): - return { - "Sid": "readonly", - "Action": ["s3:GetObject", "s3:GetObjectAcl", "s3:GetObjectVersion"], - "Effect": "Allow", - "Resource": self._s3_objects_arns(self._readonly_arns), - } - - @property - def _readwrite_statement(self): - return { - "Sid": "readwrite", - "Action": [ - "s3:GetObject", - "s3:GetObjectAcl", - "s3:GetObjectVersion", - "s3:DeleteObject", - "s3:DeleteObjectVersion", - "s3:PutObject", - "s3:PutObjectAcl", - "s3:RestoreObject", - ], - "Effect": "Allow", - "Resource": self._s3_objects_arns(self._readwrite_arns), - } - - def _s3_objects_arns(self, arns): - return [f"{arn}/*" for arn in arns] - - def _s3_buckets_arns(self, arns): - return [arn.rsplit("/*", 1)[0] for arn in arns] - - def _load(self, document): - for statement in document["Statement"]: - sid = statement["Sid"] - if sid in ("readonly", "readwrite"): - arns = set(self._s3_buckets_arns(statement["Resource"])) - if sid == "readwrite": - self._readwrite_arns = arns - else: - self._readonly_arns = arns diff --git a/tests/api/test_aws_s3_access_policy.py b/tests/api/test_aws_s3_access_policy.py deleted file mode 100644 index bdd99f3e7..000000000 --- a/tests/api/test_aws_s3_access_policy.py +++ /dev/null @@ -1,205 +0,0 @@ -from controlpanel.api.aws import S3AccessPolicy -from tests.api.iam_test_cases import IAMPolicyTestCase - - -class S3AccessTestCase(IAMPolicyTestCase): - - CONSOLE_ACTIONS = ( - 's3:GetBucketLocation', - 's3:ListAllMyBuckets', - ) - - READ_ACTIONS = ( - 's3:GetObject', - 's3:GetObjectAcl', - 's3:GetObjectVersion', - ) - - WRITE_ACTIONS = ( - 's3:DeleteObject', - 's3:DeleteObjectVersion', - 's3:PutObject', - 's3:PutObjectAcl', - 's3:RestoreObject', - ) - - def assert_has_readwrite_access(self, document, arn): - self._assert_can_list(document, arn) - self._assert_can_read(document, arn) - self._assert_can_write(document, arn) - - def assert_has_readonly_access(self, document, arn): - self._assert_can_list(document, arn) - self._assert_can_read(document, arn) - self.assert_cannot_write(document, arn) - - def assert_can_access_console(self, document): - for action in self.CONSOLE_ACTIONS: - self.assert_allowed(document, action, 'arn:aws:s3:::*') - - def assert_cannot_list(self, document, resource): - self.assert_not_allowed(document, 's3:ListBucket', resource) - - def assert_cannot_read(self, document, resource): - for action in self.READ_ACTIONS: - self.assert_not_allowed(document, action, f'{resource}/*') - - def assert_cannot_write(self, document, resource): - for action in self.WRITE_ACTIONS: - self.assert_not_allowed(document, action, f'{resource}/*') - - def _assert_can_list(self, document, resource): - self.assert_allowed(document, 's3:ListBucket', resource) - - def _assert_can_read(self, document, resource): - for action in self.READ_ACTIONS: - self.assert_allowed(document, action, f'{resource}/*') - - def _assert_can_write(self, document, resource): - for action in self.WRITE_ACTIONS: - self.assert_allowed(document, action, f'{resource}/*') - - -class S3AccessPolicyTestCase(S3AccessTestCase): - - def test_blank_policy(self): - policy = S3AccessPolicy() - document = policy.document - - self.assert_can_access_console(document) - self.assert_has_not_sid(document, 'list') - self.assert_has_not_sid(document, 'readonly') - self.assert_has_not_sid(document, 'readwrite') - - def test_granting_access(self): - arns = ( - ('arn:aws:s3:::readonly-1', False), - ('arn:aws:s3:::readonly-2', False), - ('arn:aws:s3:::readwrite-1', True), - ('arn:aws:s3:::readwrite-2', True), - ) - - policy = S3AccessPolicy() - for (arn, readwrite) in arns: - policy.grant_access(arn, readwrite=readwrite) - - document = policy.document - self.assert_can_access_console(document) - - for (arn, readwrite) in arns: - if readwrite: - self.assert_has_readwrite_access(document, arn) - else: - self.assert_has_readonly_access(document, arn) - - def test_granting_access_again_with_different_level(self): - ARN = 'arn:aws:s3:::test-bucket-1' - - policy = S3AccessPolicy() - policy.grant_access(ARN, readwrite=True) - - document = policy.document - self.assert_can_access_console(document) - self.assert_has_readwrite_access(document, ARN) - - policy.grant_access(ARN, readwrite=False) - - document = policy.document - self.assert_can_access_console(document) - self.assert_has_readonly_access(document, ARN) - - def test_revoking_access(self): - readonly_arn = 'arn:aws:s3:::readonly-1' - readwrite_arn = 'arn:aws:s3:::readwrite-1' - - policy = S3AccessPolicy() - policy.grant_access(readonly_arn, readwrite=False) - policy.grant_access(readwrite_arn, readwrite=True) - - document = policy.document - self.assert_can_access_console(document) - self.assert_has_readonly_access(document, readonly_arn) - self.assert_has_readwrite_access(document, readwrite_arn) - - policy.revoke_access(readwrite_arn) - - document = policy.document - # access to `readwrite_arn` is now revoked - self.assert_cannot_list(document, readwrite_arn) - self.assert_cannot_read(document, readwrite_arn) - self.assert_cannot_write(document, readwrite_arn) - # existing permissions didn't change - self.assert_can_access_console(document) - self.assert_has_readonly_access(document, readonly_arn) - - def test_import_document(self): - DOCUMENT = { - 'Version': '2012-10-17', - 'Statement': [ - { - 'Sid': 'console', - 'Effect': 'Allow', - 'Action': [ - 's3:GetBucketLocation', - 's3:ListAllMyBuckets', - ], - 'Resource': 'arn:aws:s3:::*', - }, - { - 'Sid': 'list', - 'Action': [ - 's3:ListBucket', - ], - 'Effect': 'Allow', - 'Resource': [ - 'arn:aws:s3:::readwrite-1', - 'arn:aws:s3:::readwrite-2', - 'arn:aws:s3:::readonly-1', - ], - }, - { - 'Sid': 'readonly', - 'Action': [ - 's3:GetObject', - 's3:GetObjectAcl', - 's3:GetObjectVersion', - ], - 'Effect': 'Allow', - 'Resource': [ - 'arn:aws:s3:::readonly-1/*', - ], - }, - { - 'Sid': 'readwrite', - 'Action': [ - 's3:GetObject', - 's3:GetObjectAcl', - 's3:GetObjectVersion', - 's3:DeleteObject', - 's3:DeleteObjectVersion', - 's3:PutObject', - 's3:PutObjectAcl', - 's3:RestoreObject', - ], - 'Effect': 'Allow', - 'Resource': [ - 'arn:aws:s3:::readwrite-1/*', - 'arn:aws:s3:::readwrite-2/*', - ] - }, - ], - } - - policy = S3AccessPolicy(document=DOCUMENT) - document = policy.document - - readonly_arn = 'arn:aws:s3:::readonly-1' - readwrite_arns = ( - 'arn:aws:s3:::readwrite-1', - 'arn:aws:s3:::readwrite-2', - ) - - self.assert_can_access_console(document) - self.assert_has_readonly_access(document, readonly_arn) - for arn in readwrite_arns: - self.assert_has_readwrite_access(document, arn)