Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LTD-4662] process file uploads from source bucket #1808

Closed
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions api/cases/tests/test_case_documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ def setUp(self):
self.file = self.create_case_document(self.case, self.gov_user, "Test")
self.path = "cases:document_download"

s3 = init_s3_client()
s3 = init_s3_client()["processed"]
s3.create_bucket(
Bucket=settings.AWS_STORAGE_BUCKET_NAME,
Bucket=settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_STORAGE_BUCKET_NAME"],
CreateBucketConfiguration={
"LocationConstraint": settings.AWS_REGION,
"LocationConstraint": settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_REGION"],
},
)
s3.put_object(
Bucket=settings.AWS_STORAGE_BUCKET_NAME,
Bucket=settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_STORAGE_BUCKET_NAME"],
Key=self.file.s3_key,
Body=b"test",
)
Expand Down
43 changes: 33 additions & 10 deletions api/conf/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
GOV_NOTIFY_ENABLED=(bool, False),
DOCUMENT_SIGNING_ENABLED=(bool, False),
GIT_COMMIT=(str, ""),
AWS_S3_BUCKETS=(dict, {}),
)

# Quick-start development settings - unsuitable for production
Expand Down Expand Up @@ -233,22 +234,44 @@
# AWS
VCAP_SERVICES = env.json("VCAP_SERVICES", {})

AWS_S3_BUCKETS = {}
FILE_UPLOAD_STAGED_NAME = "file-upload-staged"
FILE_UPLOAD_PROCESSED_NAME = "file-upload-processed"

if VCAP_SERVICES:
if "aws-s3-bucket" not in VCAP_SERVICES:
raise Exception("S3 Bucket not bound to environment")

aws_credentials = VCAP_SERVICES["aws-s3-bucket"][0]["credentials"]
AWS_ENDPOINT_URL = None
AWS_ACCESS_KEY_ID = aws_credentials["aws_access_key_id"]
AWS_SECRET_ACCESS_KEY = aws_credentials["aws_secret_access_key"]
AWS_REGION = aws_credentials["aws_region"]
AWS_STORAGE_BUCKET_NAME = aws_credentials["bucket_name"]

for bucket_details in VCAP_SERVICES["aws-s3-bucket"]:
bucket_name = None
if FILE_UPLOAD_PROCESSED_NAME in bucket_details["tags"]:
bucket_name = FILE_UPLOAD_PROCESSED_NAME
elif FILE_UPLOAD_STAGED_NAME in bucket_details["tags"]:
bucket_name = FILE_UPLOAD_STAGED_NAME
else:
# Skip buckets which are not tagged with the expected names
continue

AWS_S3_BUCKETS[bucket_name] = {
"AWS_ACCESS_KEY_ID": bucket_details["aws_access_key_id"],
"AWS_SECRET_ACCESS_KEY": bucket_details["aws_secret_access_key"],
"AWS_REGION": aws_credentials["aws_region"],
"AWS_STORAGE_BUCKET_NAME": aws_credentials["bucket_name"],
}
Comment on lines +246 to +261
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm guessing this means we need to do a bit of fiddling first with tags before we can deploy this anywhere?

Is there anything else we need to do upfront? Does this require us to have both buckets created as well?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am guessing these tags are set by SRE?
Locally when we use Minio, does that allow us to set these tags?

else:
AWS_ENDPOINT_URL = env("AWS_ENDPOINT_URL", default=None)
AWS_ACCESS_KEY_ID = env("AWS_ACCESS_KEY_ID")
AWS_SECRET_ACCESS_KEY = env("AWS_SECRET_ACCESS_KEY")
AWS_REGION = env("AWS_REGION")
AWS_STORAGE_BUCKET_NAME = env("AWS_STORAGE_BUCKET_NAME")
AWS_S3_BUCKETS = env.json("AWS_S3_BUCKETS", {})

if FILE_UPLOAD_PROCESSED_NAME not in AWS_S3_BUCKETS:
raise Exception("S3 file upload processed bucket not found")

if FILE_UPLOAD_STAGED_NAME not in AWS_S3_BUCKETS:
raise Exception("S3 file upload staged bucket not found")


FILE_UPLOAD_PROCESSED_BUCKET = AWS_S3_BUCKETS[FILE_UPLOAD_PROCESSED_NAME]
FILE_UPLOAD_STAGED_BUCKET = AWS_S3_BUCKETS[FILE_UPLOAD_STAGED_NAME]

if "redis" in VCAP_SERVICES:
REDIS_BASE_URL = VCAP_SERVICES["redis"][0]["credentials"]["uri"]
Expand Down
17 changes: 16 additions & 1 deletion api/documents/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@
logger = get_task_logger(__name__)


@shared_task(
bind=True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think these tasks need to be bound.

)
def process_uploaded_document(self, document_id):
""" """
document = Document.objects.get(id=document_id)
document.move_staged_document()
scan_document_for_viruses.apply_async(args=(document_id,), link_error=delete_document_from_s3.si(document_id))


@shared_task(
bind=True,
autoretry_for=(Exception,),
Expand Down Expand Up @@ -43,4 +53,9 @@ def delete_document_from_s3(document_id):
"Maximum attempts of %s for document %s has been reached calling s3 delete", MAX_ATTEMPTS, document_id
)
document = Document.objects.get(id=document_id)
document.delete_s3()
# For now, always attempt to delete from both staged and processed S3 buckets..
# This is because we cannot be sure right now if we have moved over to using
# two buckets or not. When we are using two S3 buckets, we can be more specific and ensure
# to only target the `delete_document_from_s3()` task at one of the S3 buckets
document.delete_s3(bucket="staged")
document.delete_s3(bucket="processed")
8 changes: 4 additions & 4 deletions api/documents/libraries/process_document.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@

from rest_framework import serializers

from api.documents.celery_tasks import scan_document_for_viruses, delete_document_from_s3
from api.documents.celery_tasks import process_uploaded_document, delete_document_from_s3

logger = logging.getLogger(__name__)


def process_document(document):
try:
document_id = str(document.id)
scan_document_for_viruses.apply_async(args=(document_id,), link_error=delete_document_from_s3.si(document_id))
process_uploaded_document.apply_async(args=(document_id,), link_error=delete_document_from_s3.si(document_id))
except Exception:
logger.exception("Error scanning document with id %s for viruses", document_id)
raise serializers.ValidationError({"document": "Error scanning document for viruses"})
logger.exception("Error processing document with id %s", document_id)
raise serializers.ValidationError({"document": "Error processing document"})
93 changes: 70 additions & 23 deletions api/documents/libraries/s3_operations.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,72 +4,119 @@

import boto3
from botocore.config import Config
from botocore.exceptions import BotoCoreError, ReadTimeoutError
from botocore.exceptions import BotoCoreError, ReadTimeoutError, ClientError

from django.conf import settings
from django.http import FileResponse

logger = logging.getLogger(__name__)

_client = None
_processed_client = None
_staged_client = None


def init_s3_client():
# We want to instantiate this once, ideally, but there may be cases where we
# want to explicitly re-instiate the client e.g. in tests.
global _client
global _processed_client
global _staged_client
additional_s3_params = {}
if settings.AWS_ENDPOINT_URL:
additional_s3_params["endpoint_url"] = settings.AWS_ENDPOINT_URL
_client = boto3.client(

_processed_client = boto3.client(
"s3",
aws_access_key_id=settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_SECRET_ACCESS_KEY"],
region_name=settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_REGION"],
config=Config(connect_timeout=settings.S3_CONNECT_TIMEOUT, read_timeout=settings.S3_REQUEST_TIMEOUT),
**additional_s3_params,
)
_staged_client = boto3.client(
"s3",
aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
region_name=settings.AWS_REGION,
aws_access_key_id=settings.FILE_UPLOAD_STAGED_BUCKET["AWS_ACCESS_KEY_ID"],
aws_secret_access_key=settings.FILE_UPLOAD_STAGED_BUCKET["AWS_SECRET_ACCESS_KEY"],
region_name=settings.FILE_UPLOAD_STAGED_BUCKET["AWS_REGION"],
config=Config(connect_timeout=settings.S3_CONNECT_TIMEOUT, read_timeout=settings.S3_REQUEST_TIMEOUT),
**additional_s3_params,
)
return _client
return {"staged": _staged_client, "processed": _processed_client}


init_s3_client()


def get_object(document_id, s3_key):
logging.info(f"Retrieving file '{s3_key}' on document '{document_id}'")
def _get_bucket_client(bucket):
if bucket == "processed":
return _processed_client, settings.FILE_UPLOAD_PROCESSED_BUCKET["AWS_STORAGE_BUCKET_NAME"]
elif bucket == "staged":
return _staged_client, settings.FILE_UPLOAD_STAGED_BUCKET["AWS_STORAGE_BUCKET_NAME"]
else:
raise Exception(f"No S3 bucket exists with label '{bucket}'")


def get_object(document_id, s3_key, bucket="processed"):
logger.info(f"Retrieving file '{s3_key}' on document '{document_id}' from bucket '{bucket}'")
aws_client, bucket_name = _get_bucket_client(bucket)

try:
return _client.get_object(Bucket=settings.AWS_STORAGE_BUCKET_NAME, Key=s3_key)
return aws_client.get_object(Bucket=bucket_name, Key=s3_key)
except ReadTimeoutError:
logging.warning(f"Timeout exceeded when retrieving file '{s3_key}' on document '{document_id}'")
logger.warning(f"Timeout exceeded when retrieving file '{s3_key}' on document '{document_id}'")
except BotoCoreError as exc:
logging.warning(
logger.warning(
f"An unexpected error occurred when retrieving file '{s3_key}' on document '{document_id}': {exc}"
)


def move_staged_document_to_processed(document_id, s3_key):
logger.info(f"Moving file '{s3_key}' on document '{document_id}' from staged bucket to processed bucket")
# Grab the document from the staged S3 bucket
try:
staged_document = get_object(document_id, s3_key, "staged")
except ClientError as exc:
logger.warning(f"An error occurred when retrieving file '{s3_key}' on document '{document_id}': {exc}")
# TODO: When we move over to using two S3 buckets, we should make this raise an exception.
# For now, this keeps us backward compatible so that we can switch from
# a single S3 bucket to staged/processed buckets more smoothly
return

# Upload the document to the processed S3 bucket
# NOTE: Ideally we would use AWS' copy operation to copy from bucket to bucket.
# However, the IAM credentials we are using are limited with individual credentials having
# read/write for ONE bucket only - for copying, we would need credentials with read for the
# staged bucket and write for the processed bucket. This might be something to investigate
# with SRE later.
processed_aws_client, processed_bucket_name = _get_bucket_client("processed")
processed_aws_client.put_object(Bucket=processed_bucket_name, Key=s3_key, Body=staged_document["Body"].read())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just reading the documentation for put_object and it suggests that you can set Body as [a] seekable file-like object and get_object returns a StreamingBody which I think acts like a seekable file so you might not need to fully .read() and the two interfaces will just stream the file.


# Delete the document from the staged S3 bucket now we have moved it successfully
delete_file(document_id, s3_key, bucket="staged")


def generate_s3_key(document_name, file_extension):
return f"{document_name}-{uuid.uuid4()}.{file_extension}"


def upload_bytes_file(raw_file, s3_key):
_client.put_object(Bucket=settings.AWS_STORAGE_BUCKET_NAME, Key=s3_key, Body=raw_file)
def upload_bytes_file(raw_file, s3_key, bucket="processed"):
aws_client, bucket_name = _get_bucket_client(bucket)
aws_client.put_object(Bucket=bucket_name, Key=s3_key, Body=raw_file)


def delete_file(document_id, s3_key):
logging.info(f"Deleting file '{s3_key}' on document '{document_id}'")
def delete_file(document_id, s3_key, bucket="processed"):
logger.info(f"Deleting file '{s3_key}' on document '{document_id}' from bucket '{bucket}'")
aws_client, bucket_name = _get_bucket_client(bucket)

try:
_client.delete_object(Bucket=settings.AWS_STORAGE_BUCKET_NAME, Key=s3_key)
aws_client.delete_object(Bucket=bucket_name, Key=s3_key)
except ReadTimeoutError:
logging.warning(f"Timeout exceeded when retrieving file '{s3_key}' on document '{document_id}'")
logger.warning(f"Timeout exceeded when retrieving file '{s3_key}' on document '{document_id}'")
except BotoCoreError as exc:
logging.warning(
f"An unexpected error occurred when deleting file '{s3_key}' on document '{document_id}': {exc}"
)
logger.warning(f"An unexpected error occurred when deleting file '{s3_key}' on document '{document_id}': {exc}")


def document_download_stream(document):
s3_response = get_object(document.id, document.s3_key)
s3_response = get_object(document.id, document.s3_key, "processed")
content_type = mimetypes.MimeTypes().guess_type(document.name)[0]

response = FileResponse(
Expand Down
Loading
Loading