diff --git a/owen/aws/README.md b/owen/aws/README.md new file mode 100644 index 0000000..9e79449 --- /dev/null +++ b/owen/aws/README.md @@ -0,0 +1,93 @@ + + + + +# Deploying the DDEX Processing Pipeline on AWS + +This guide will walk you through deploying a DDEX processing pipeline on your AWS account. The architecture consists of: +- An S3 bucket (`ddex-messages-prod`) for uploading DDEX files. +- A Lambda function (`sendDdexToSQS`) to process uploaded files. +- An SQS FIFO queue (`OwenInputQueue.fifo`) to manage zip file delivery. +- A Lambda function (`Owen`) to process the zip files from the queue. + +## Prerequisites + +1. AWS CLI installed and configured. +2. Python 3.9+ installed for Lambda functions (or another runtime if preferred). +3. Sufficient IAM permissions to create resources on AWS. + +--- + + +```mermaid +graph TD + A[S3 Bucket] -->|BatchComplete*.xml Trigger| B[sendDdexToSQS Lambda] + B -->|Send Zip File as SQS Message| C[OwenInputQueue.fifo SQS FIFO Queue] + C -->|Trigger| D[Owen Lambda] +``` + +--- + +## Steps to Deploy + +### 1. Create the S3 Bucket +1. Open the AWS S3 Console. +2. Create a new bucket: + - **Bucket Name**: `ddex-messages-prod` + - **Region**: Select your preferred AWS region. + - Enable versioning (optional). +3. Add a folder structure as needed for DDEX uploads. + +--- + +### 2. Create the SQS FIFO Queue +1. Open the AWS SQS Console. +2. Create a new FIFO queue: + - **Name**: `OwenInputQueue.fifo` + - **Message Deduplication**: Enable (using content-based deduplication). +![alt text](image.png) + + - **Add the following Access polcicy JSON**. replacing REGION, ACCOUNT_ID and LAMBDA_FUNTION_ARN for your own values. +```json + { + "Version": "2012-10-17", + "Id": "PolicyForOwenInputQueue", + "Statement": [ + { + "Sid": "AllowSendMessageFromLambda", + "Effect": "Allow", + "Principal": { + "Service": "lambda.amazonaws.com" + }, + "Action": "sqs:SendMessage", + "Resource": "arn:aws:sqs:::OwenInputQueue.fifo", + "Condition": { + "ArnEquals": { + "aws:SourceArn": "" + } + } + } + ] +} + +``` +Example values can be: +`REGION`: us-east-1 +`ACCOUNT_ID`: 123456789012 +`LAMBDA_FUNCTION_ARN`: "arn:aws:lambda:us-east-1:123456789012:function:sendDdexToSQS" + +--- + +### 3. Create the `sendDdexToSQS` Lambda Function +1. Open the AWS Lambda Console and create a new Lambda function: + - **Name**: `sendDdexToSQS` + - **Runtime**: Python 3.9+. + - **Architcture**: x86_64 + - **Permissions**: Attach an IAM role with the following policies: + - `AmazonS3ReadOnlyAccess` + - `AmazonSQSFullAccess` + +2. Add the following code to the function: +Replace YOUR_SQS_QUEUE_URL with the URL of the OwenInputQueue.fifo. + +Deploy the function diff --git a/owen/aws/image-1.png b/owen/aws/image-1.png new file mode 100644 index 0000000..3706ca4 Binary files /dev/null and b/owen/aws/image-1.png differ diff --git a/owen/aws/image-2.png b/owen/aws/image-2.png new file mode 100644 index 0000000..1e93b36 Binary files /dev/null and b/owen/aws/image-2.png differ diff --git a/owen/aws/image.png b/owen/aws/image.png new file mode 100644 index 0000000..0ddbc44 Binary files /dev/null and b/owen/aws/image.png differ diff --git a/owen/aws/lambda/sendDdexToSQS/README.md b/owen/aws/lambda/sendDdexToSQS/README.md new file mode 100644 index 0000000..bb09184 --- /dev/null +++ b/owen/aws/lambda/sendDdexToSQS/README.md @@ -0,0 +1,45 @@ +# sendDdexToSQS Lambda Function + +The `sendDdexToSQS` Lambda function processes batches of files uploaded to an S3 bucket and prepares them for further processing by: +1. Packaging all files (except `BatchComplete*.xml`) in the folder into a single zip file. +2. Sending the zip file to an SQS FIFO queue (`OwenInputQueue.fifo`) for downstream processing. + +## Workflow + +1. Triggered by an S3 event when a `BatchComplete*.xml` file is uploaded. +2. Identifies the folder containing the `BatchComplete*.xml` file. +3. Lists all files in the folder. +4. Creates a zip archive of the folder contents (excluding the `BatchComplete*.xml` file). +5. Sends the zip file to the SQS FIFO queue with metadata for ordered and deduplicated processing. + +## Prerequisites + +- **S3 Bucket**: Ensure the bucket exists and is configured to trigger this Lambda on `BatchComplete*.xml` uploads. +- **SQS FIFO Queue**: Create the queue and note its URL. +- **IAM Role**: The Lambda's execution role requires: + - `AmazonS3ReadOnlyAccess` + - `AmazonSQSFullAccess` + +## Variables + +- `QUEUE_URL`: The URL of the SQS FIFO queue (`OwenInputQueue.fifo`). + +## Deployment + +1. **Create the Lambda**: + - Set the runtime to Python 3.9+. + - Paste the code into the function editor or deploy via CLI. + +2. **Configure the S3 Trigger**: + - Set the bucket to trigger this Lambda on `BatchComplete*.xml` uploads. + +3. **Test**: + - Upload files to the S3 bucket, including a `BatchComplete*.xml`. + - Verify that a message with the zip file is sent to the SQS FIFO queue. + +## Output + +- Sends an SQS message with: + - The S3 bucket name. + - The zip file (encoded as a string). + - Metadata (`MessageGroupId`, `MessageDeduplicationId`) for ordered processing. diff --git a/owen/aws/lambda/sendDdexToSQS/lambda_function.py b/owen/aws/lambda/sendDdexToSQS/lambda_function.py new file mode 100644 index 0000000..48f8eea --- /dev/null +++ b/owen/aws/lambda/sendDdexToSQS/lambda_function.py @@ -0,0 +1,61 @@ +import boto3 +import zipfile +import io +import json + +s3 = boto3.client('s3') +sqs = boto3.client('sqs') + +def lambda_handler(event, context): + for record in event['Records']: + bucket_name = record['s3']['bucket']['name'] + object_key = record['s3']['object']['key'] + + print(f"Triggered by file: {object_key} in bucket: {bucket_name}") + + if "/" in object_key: + batch_folder = "/".join(object_key.split("/")[:-1]) + "/" + else: + batch_folder = "" + + print(f"Batch folder: {batch_folder}") + + response = s3.list_objects_v2(Bucket=bucket_name, Prefix=batch_folder) + files = response.get('Contents', []) + + if not files: + print(f"No files found in folder: {batch_folder}") + return { + "statusCode": 404, + "body": "No files found in folder" + } + + zip_buffer = io.BytesIO() + with zipfile.ZipFile(zip_buffer, 'w') as zip_file: + for file in files: + file_key = file['Key'] + if file_key == object_key: + continue + file_obj = s3.get_object(Bucket=bucket_name, Key=file_key) + file_content = file_obj['Body'].read() + zip_file.writestr(file_key.split('/')[-1], file_content) + + zip_buffer.seek(0) + zip_binary = zip_buffer.getvalue() + + queue_url = "YOUR_SQS_QUEUE_URL" # Replace with the OwenInputQueue.fifo URL + sqs.send_message( + QueueUrl=queue_url, + MessageBody=json.dumps({ + "bucket": bucket_name, + "zip_file": zip_binary.hex() + }), + MessageGroupId="BatchProcessing", + MessageDeduplicationId=batch_folder + ) + print(f"Zip file sent to SQS successfully for folder: {batch_folder}") + + return { + "statusCode": 200, + "body": "Zip file created and sent to SQS" + } \ No newline at end of file