Skip to content

Commit

Permalink
Initial implementation of archiving S3 folder contents (#1215)
Browse files Browse the repository at this point in the history
* Initial implementation of archiving S3 folder contents

Use tasks so that the contents are moved asynchronously. A tasks is
fired for every individual object in order to utilise multiple
workers to improve performance.
Adds methods to cluster, aws files to handle retrieving and
archiving objects.

---------

Co-authored-by: ymao2 <[email protected]>
  • Loading branch information
michaeljcollinsuk and ymao2 authored Nov 9, 2023
1 parent 8fafe38 commit 04b1af2
Show file tree
Hide file tree
Showing 9 changed files with 125 additions and 5 deletions.
21 changes: 21 additions & 0 deletions controlpanel/api/aws.py
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,27 @@ def exists(self, folder_name):
except botocore.exceptions.ClientError:
return False

def get_objects(self, bucket_name, folder_name):
bucket = self.boto3_session.resource("s3").Bucket(bucket_name)
return bucket.objects.filter(Prefix=f"{folder_name}/")

def archive_object(self, key, source_bucket_name=None, delete_original=True):
source_bucket_name = source_bucket_name or settings.S3_FOLDER_BUCKET_NAME
copy_source = {
'Bucket': source_bucket_name,
'Key': key
}
archive_bucket = self.boto3_session.resource("s3").Bucket(
settings.S3_ARCHIVE_BUCKET_NAME
)
new_key = f"{archive_bucket.name}/{key}"

archive_bucket.copy(copy_source, new_key)
log.info(f"Moved {key} to {new_key}")
if delete_original:
self.boto3_session.resource("s3").Object(source_bucket_name, key).delete()
log.info(f"deleted original: {source_bucket_name}/{key}")


class AWSBucket(AWSService):
def create(self, bucket_name, is_data_warehouse=False):
Expand Down
11 changes: 11 additions & 0 deletions controlpanel/api/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,17 @@ def exists(self, folder_name, bucket_owner):
folder_path = f"{settings.S3_FOLDER_BUCKET_NAME}/{folder_name}"
return super().exists(folder_path, bucket_owner), folder_path

def get_objects(self):
bucket_name, folder_name = self.bucket.name.split("/")
return self.aws_bucket_service.get_objects(
bucket_name=bucket_name, folder_name=folder_name,
)

def archive_object(self, key, source_bucket=None, delete_original=True):
self.aws_bucket_service.archive_object(
key=key, source_bucket_name=source_bucket, delete_original=delete_original,
)


class RoleGroup(EntityResource):
"""
Expand Down
7 changes: 4 additions & 3 deletions controlpanel/api/models/s3bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
from controlpanel.api import cluster, tasks, validators
from controlpanel.api.models.apps3bucket import AppS3Bucket
from controlpanel.api.models.users3bucket import UserS3Bucket
from controlpanel.api.tasks.s3bucket import S3BucketRevokeAllAccess


def s3bucket_console_url(name):
Expand Down Expand Up @@ -179,7 +178,9 @@ def soft_delete(self, deleted_by: User):
self.deleted_at = timezone.now()
self.save()
# TODO update to handle deleting folders
if not self.is_folder:
if self.is_folder:
tasks.S3BucketArchive(self, self.deleted_by).create_task()
else:
self.cluster.mark_for_archival()

S3BucketRevokeAllAccess(self, self.deleted_by).create_task()
tasks.S3BucketRevokeAllAccess(self, self.deleted_by).create_task()
3 changes: 3 additions & 0 deletions controlpanel/api/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
# First-party/Local
from controlpanel.api.tasks.app import AppCreateAuth, AppCreateRole
from controlpanel.api.tasks.s3bucket import (
S3BucketArchive,
S3BucketArchiveObject,
S3BucketCreate,
S3BucketGrantToApp,
S3BucketGrantToUser,
S3BucketRevokeAllAccess,
S3BucketRevokeAppAccess,
S3BucketRevokeUserAccess,
)
4 changes: 4 additions & 0 deletions controlpanel/api/tasks/handlers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
from controlpanel import celery_app
from controlpanel.api.tasks.handlers.app import CreateAppAuthSettings, CreateAppAWSRole
from controlpanel.api.tasks.handlers.s3 import (
ArchiveS3Bucket,
ArchiveS3Object,
CreateS3Bucket,
GrantAppS3BucketAccess,
GrantUserS3BucketAccess,
Expand All @@ -18,3 +20,5 @@
revoke_user_s3bucket_access = celery_app.register_task(S3BucketRevokeUserAccess())
revoke_app_s3bucket_access = celery_app.register_task(S3BucketRevokeAppAccess())
revoke_all_access_s3bucket = celery_app.register_task(S3BucketRevokeAllAccess())
archive_s3bucket = celery_app.register_task(ArchiveS3Bucket)
archive_s3_object = celery_app.register_task(ArchiveS3Object)
28 changes: 27 additions & 1 deletion controlpanel/api/tasks/handlers/s3.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
# Third-party
import structlog
from django.db.models.deletion import Collector

# First-party/Local
from controlpanel.api import cluster
from controlpanel.api import cluster, tasks
from controlpanel.api.models import App, AppS3Bucket, S3Bucket, User, UserS3Bucket
from controlpanel.api.models.access_to_s3bucket import AccessToS3Bucket
from controlpanel.api.tasks.handlers.base import BaseModelTaskHandler, BaseTaskHandler

log = structlog.getLogger(__name__)


class CreateS3Bucket(BaseModelTaskHandler):
model = S3Bucket
Expand Down Expand Up @@ -97,3 +100,26 @@ def handle(self, *args, **kwargs):
instance.revoke_bucket_access()

self.complete()


class ArchiveS3Bucket(BaseModelTaskHandler):
model = S3Bucket
name = "archive_s3bucket"

def handle(self, *args, **kwargs):
task_user = User.objects.filter(pk=self.task_user_pk).first()
for s3obj in cluster.S3Folder(self.object).get_objects():
tasks.S3BucketArchiveObject(
self.object, task_user, extra_data={"s3obj_key": s3obj.key}
).create_task()
self.complete()


class ArchiveS3Object(BaseModelTaskHandler):
model = S3Bucket
name = "archive_s3_object"

def handle(self, s3obj_key):
# TODO update to use self.object.cluster to work with buckets
cluster.S3Folder(self.object).archive_object(key=s3obj_key)
self.complete()
31 changes: 31 additions & 0 deletions controlpanel/api/tasks/s3bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,34 @@ def _get_args_list(self):
self.entity.s3bucket.arn,
self.entity.app.pk,
]


class S3BucketArchive(TaskBase):
ENTITY_CLASS = "S3Bucket"
QUEUE_NAME = settings.S3_QUEUE_NAME

@property
def task_name(self):
return "archive_s3bucket"

@property
def task_description(self):
return "move contents of s3 datasource to the archive bucket"


class S3BucketArchiveObject(TaskBase):
ENTITY_CLASS = "S3Bucket"
QUEUE_NAME = settings.S3_QUEUE_NAME

@property
def task_name(self):
return "archive_s3_object"

@property
def task_description(self):
return "move object to archive bucket"

def _get_args_list(self):
args = super()._get_args_list()
args.append(self.extra_data.get('s3obj_key'))
return args
2 changes: 2 additions & 0 deletions controlpanel/settings/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -565,3 +565,5 @@
CELERY_IMPORTS = [
"controlpanel.api.tasks.handlers"
]

S3_ARCHIVE_BUCKET_NAME = "dev-archive-folder"
23 changes: 22 additions & 1 deletion tests/api/models/test_s3bucket.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def test_cluster(name, expected):
assert isinstance(S3Bucket(name=name).cluster, expected)


def test_soft_delete(bucket, users, sqs, helpers):
def test_soft_delete_bucket(bucket, users, sqs, helpers):
user = users["superuser"]

assert bucket.is_deleted is False
Expand All @@ -114,3 +114,24 @@ def test_soft_delete(bucket, users, sqs, helpers):
helpers.validate_task_with_sqs_messages(
messages, S3Bucket.__name__, bucket.id, queue_name=settings.S3_QUEUE_NAME,
)


def test_soft_delete_folder(users, sqs, helpers):
folder = S3Bucket.objects.create(name="bucket/folder-1")
user = users["superuser"]

assert folder.is_deleted is False
folder.soft_delete(deleted_by=user)

assert folder.is_deleted is True
assert folder.deleted_by == user
assert folder.deleted_at is not None

messages = helpers.retrieve_messages(sqs, queue_name=settings.S3_QUEUE_NAME)
task_names = [message["headers"]["task"] for message in messages]

helpers.validate_task_with_sqs_messages(
messages, S3Bucket.__name__, folder.id, queue_name=settings.S3_QUEUE_NAME,
)
assert "archive_s3bucket" in task_names
assert "s3bucket_revoke_all_access" in task_names

0 comments on commit 04b1af2

Please sign in to comment.