From 04b1af2765c2cb0a44fe55447fab1c1f9aa4c0b7 Mon Sep 17 00:00:00 2001 From: Michael Collins <15347726+michaeljcollinsuk@users.noreply.github.com> Date: Thu, 9 Nov 2023 09:15:55 +0000 Subject: [PATCH] Initial implementation of archiving S3 folder contents (#1215) * Initial implementation of archiving S3 folder contents Use tasks so that the contents are moved asynchronously. A tasks is fired for every individual object in order to utilise multiple workers to improve performance. Adds methods to cluster, aws files to handle retrieving and archiving objects. --------- Co-authored-by: ymao2 --- controlpanel/api/aws.py | 21 ++++++++++++++ controlpanel/api/cluster.py | 11 ++++++++ controlpanel/api/models/s3bucket.py | 7 +++-- controlpanel/api/tasks/__init__.py | 3 ++ controlpanel/api/tasks/handlers/__init__.py | 4 +++ controlpanel/api/tasks/handlers/s3.py | 28 ++++++++++++++++++- controlpanel/api/tasks/s3bucket.py | 31 +++++++++++++++++++++ controlpanel/settings/common.py | 2 ++ tests/api/models/test_s3bucket.py | 23 ++++++++++++++- 9 files changed, 125 insertions(+), 5 deletions(-) diff --git a/controlpanel/api/aws.py b/controlpanel/api/aws.py index c61072bd8..8dedaa030 100644 --- a/controlpanel/api/aws.py +++ b/controlpanel/api/aws.py @@ -590,6 +590,27 @@ def exists(self, folder_name): except botocore.exceptions.ClientError: return False + def get_objects(self, bucket_name, folder_name): + bucket = self.boto3_session.resource("s3").Bucket(bucket_name) + return bucket.objects.filter(Prefix=f"{folder_name}/") + + def archive_object(self, key, source_bucket_name=None, delete_original=True): + source_bucket_name = source_bucket_name or settings.S3_FOLDER_BUCKET_NAME + copy_source = { + 'Bucket': source_bucket_name, + 'Key': key + } + archive_bucket = self.boto3_session.resource("s3").Bucket( + settings.S3_ARCHIVE_BUCKET_NAME + ) + new_key = f"{archive_bucket.name}/{key}" + + archive_bucket.copy(copy_source, new_key) + log.info(f"Moved {key} to {new_key}") + if delete_original: + self.boto3_session.resource("s3").Object(source_bucket_name, key).delete() + log.info(f"deleted original: {source_bucket_name}/{key}") + class AWSBucket(AWSService): def create(self, bucket_name, is_data_warehouse=False): diff --git a/controlpanel/api/cluster.py b/controlpanel/api/cluster.py index 9866bacb0..e13ba3c88 100644 --- a/controlpanel/api/cluster.py +++ b/controlpanel/api/cluster.py @@ -735,6 +735,17 @@ def exists(self, folder_name, bucket_owner): folder_path = f"{settings.S3_FOLDER_BUCKET_NAME}/{folder_name}" return super().exists(folder_path, bucket_owner), folder_path + def get_objects(self): + bucket_name, folder_name = self.bucket.name.split("/") + return self.aws_bucket_service.get_objects( + bucket_name=bucket_name, folder_name=folder_name, + ) + + def archive_object(self, key, source_bucket=None, delete_original=True): + self.aws_bucket_service.archive_object( + key=key, source_bucket_name=source_bucket, delete_original=delete_original, + ) + class RoleGroup(EntityResource): """ diff --git a/controlpanel/api/models/s3bucket.py b/controlpanel/api/models/s3bucket.py index 9d508b905..2194babb8 100644 --- a/controlpanel/api/models/s3bucket.py +++ b/controlpanel/api/models/s3bucket.py @@ -15,7 +15,6 @@ from controlpanel.api import cluster, tasks, validators from controlpanel.api.models.apps3bucket import AppS3Bucket from controlpanel.api.models.users3bucket import UserS3Bucket -from controlpanel.api.tasks.s3bucket import S3BucketRevokeAllAccess def s3bucket_console_url(name): @@ -179,7 +178,9 @@ def soft_delete(self, deleted_by: User): self.deleted_at = timezone.now() self.save() # TODO update to handle deleting folders - if not self.is_folder: + if self.is_folder: + tasks.S3BucketArchive(self, self.deleted_by).create_task() + else: self.cluster.mark_for_archival() - S3BucketRevokeAllAccess(self, self.deleted_by).create_task() + tasks.S3BucketRevokeAllAccess(self, self.deleted_by).create_task() diff --git a/controlpanel/api/tasks/__init__.py b/controlpanel/api/tasks/__init__.py index 19e653ff1..5f1ccf51f 100644 --- a/controlpanel/api/tasks/__init__.py +++ b/controlpanel/api/tasks/__init__.py @@ -2,9 +2,12 @@ # First-party/Local from controlpanel.api.tasks.app import AppCreateAuth, AppCreateRole from controlpanel.api.tasks.s3bucket import ( + S3BucketArchive, + S3BucketArchiveObject, S3BucketCreate, S3BucketGrantToApp, S3BucketGrantToUser, + S3BucketRevokeAllAccess, S3BucketRevokeAppAccess, S3BucketRevokeUserAccess, ) diff --git a/controlpanel/api/tasks/handlers/__init__.py b/controlpanel/api/tasks/handlers/__init__.py index f51323c79..3bc6b9fc0 100644 --- a/controlpanel/api/tasks/handlers/__init__.py +++ b/controlpanel/api/tasks/handlers/__init__.py @@ -2,6 +2,8 @@ from controlpanel import celery_app from controlpanel.api.tasks.handlers.app import CreateAppAuthSettings, CreateAppAWSRole from controlpanel.api.tasks.handlers.s3 import ( + ArchiveS3Bucket, + ArchiveS3Object, CreateS3Bucket, GrantAppS3BucketAccess, GrantUserS3BucketAccess, @@ -18,3 +20,5 @@ revoke_user_s3bucket_access = celery_app.register_task(S3BucketRevokeUserAccess()) revoke_app_s3bucket_access = celery_app.register_task(S3BucketRevokeAppAccess()) revoke_all_access_s3bucket = celery_app.register_task(S3BucketRevokeAllAccess()) +archive_s3bucket = celery_app.register_task(ArchiveS3Bucket) +archive_s3_object = celery_app.register_task(ArchiveS3Object) diff --git a/controlpanel/api/tasks/handlers/s3.py b/controlpanel/api/tasks/handlers/s3.py index 6fc766932..65773c320 100644 --- a/controlpanel/api/tasks/handlers/s3.py +++ b/controlpanel/api/tasks/handlers/s3.py @@ -1,12 +1,15 @@ # Third-party +import structlog from django.db.models.deletion import Collector # First-party/Local -from controlpanel.api import cluster +from controlpanel.api import cluster, tasks from controlpanel.api.models import App, AppS3Bucket, S3Bucket, User, UserS3Bucket from controlpanel.api.models.access_to_s3bucket import AccessToS3Bucket from controlpanel.api.tasks.handlers.base import BaseModelTaskHandler, BaseTaskHandler +log = structlog.getLogger(__name__) + class CreateS3Bucket(BaseModelTaskHandler): model = S3Bucket @@ -97,3 +100,26 @@ def handle(self, *args, **kwargs): instance.revoke_bucket_access() self.complete() + + +class ArchiveS3Bucket(BaseModelTaskHandler): + model = S3Bucket + name = "archive_s3bucket" + + def handle(self, *args, **kwargs): + task_user = User.objects.filter(pk=self.task_user_pk).first() + for s3obj in cluster.S3Folder(self.object).get_objects(): + tasks.S3BucketArchiveObject( + self.object, task_user, extra_data={"s3obj_key": s3obj.key} + ).create_task() + self.complete() + + +class ArchiveS3Object(BaseModelTaskHandler): + model = S3Bucket + name = "archive_s3_object" + + def handle(self, s3obj_key): + # TODO update to use self.object.cluster to work with buckets + cluster.S3Folder(self.object).archive_object(key=s3obj_key) + self.complete() diff --git a/controlpanel/api/tasks/s3bucket.py b/controlpanel/api/tasks/s3bucket.py index 4e3e8944d..cccbff8d8 100644 --- a/controlpanel/api/tasks/s3bucket.py +++ b/controlpanel/api/tasks/s3bucket.py @@ -98,3 +98,34 @@ def _get_args_list(self): self.entity.s3bucket.arn, self.entity.app.pk, ] + + +class S3BucketArchive(TaskBase): + ENTITY_CLASS = "S3Bucket" + QUEUE_NAME = settings.S3_QUEUE_NAME + + @property + def task_name(self): + return "archive_s3bucket" + + @property + def task_description(self): + return "move contents of s3 datasource to the archive bucket" + + +class S3BucketArchiveObject(TaskBase): + ENTITY_CLASS = "S3Bucket" + QUEUE_NAME = settings.S3_QUEUE_NAME + + @property + def task_name(self): + return "archive_s3_object" + + @property + def task_description(self): + return "move object to archive bucket" + + def _get_args_list(self): + args = super()._get_args_list() + args.append(self.extra_data.get('s3obj_key')) + return args diff --git a/controlpanel/settings/common.py b/controlpanel/settings/common.py index 8084bf71b..d12a7c955 100644 --- a/controlpanel/settings/common.py +++ b/controlpanel/settings/common.py @@ -565,3 +565,5 @@ CELERY_IMPORTS = [ "controlpanel.api.tasks.handlers" ] + +S3_ARCHIVE_BUCKET_NAME = "dev-archive-folder" diff --git a/tests/api/models/test_s3bucket.py b/tests/api/models/test_s3bucket.py index 67dcd6db3..63c999cbb 100644 --- a/tests/api/models/test_s3bucket.py +++ b/tests/api/models/test_s3bucket.py @@ -98,7 +98,7 @@ def test_cluster(name, expected): assert isinstance(S3Bucket(name=name).cluster, expected) -def test_soft_delete(bucket, users, sqs, helpers): +def test_soft_delete_bucket(bucket, users, sqs, helpers): user = users["superuser"] assert bucket.is_deleted is False @@ -114,3 +114,24 @@ def test_soft_delete(bucket, users, sqs, helpers): helpers.validate_task_with_sqs_messages( messages, S3Bucket.__name__, bucket.id, queue_name=settings.S3_QUEUE_NAME, ) + + +def test_soft_delete_folder(users, sqs, helpers): + folder = S3Bucket.objects.create(name="bucket/folder-1") + user = users["superuser"] + + assert folder.is_deleted is False + folder.soft_delete(deleted_by=user) + + assert folder.is_deleted is True + assert folder.deleted_by == user + assert folder.deleted_at is not None + + messages = helpers.retrieve_messages(sqs, queue_name=settings.S3_QUEUE_NAME) + task_names = [message["headers"]["task"] for message in messages] + + helpers.validate_task_with_sqs_messages( + messages, S3Bucket.__name__, folder.id, queue_name=settings.S3_QUEUE_NAME, + ) + assert "archive_s3bucket" in task_names + assert "s3bucket_revoke_all_access" in task_names