Skip to content

Commit

Permalink
S3 to SQS
Browse files Browse the repository at this point in the history
  • Loading branch information
criadoperez committed Dec 11, 2024
1 parent b0a228c commit 6f6698f
Show file tree
Hide file tree
Showing 6 changed files with 199 additions and 0 deletions.
93 changes: 93 additions & 0 deletions owen/aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@




# Deploying the DDEX Processing Pipeline on AWS

This guide will walk you through deploying a DDEX processing pipeline on your AWS account. The architecture consists of:
- An S3 bucket (`ddex-messages-prod`) for uploading DDEX files.
- A Lambda function (`sendDdexToSQS`) to process uploaded files.
- An SQS FIFO queue (`OwenInputQueue.fifo`) to manage zip file delivery.
- A Lambda function (`Owen`) to process the zip files from the queue.

## Prerequisites

1. AWS CLI installed and configured.
2. Python 3.9+ installed for Lambda functions (or another runtime if preferred).
3. Sufficient IAM permissions to create resources on AWS.

---


```mermaid
graph TD
A[S3 Bucket] -->|BatchComplete*.xml Trigger| B[sendDdexToSQS Lambda]
B -->|Send Zip File as SQS Message| C[OwenInputQueue.fifo SQS FIFO Queue]
C -->|Trigger| D[Owen Lambda]
```

---

## Steps to Deploy

### 1. Create the S3 Bucket
1. Open the AWS S3 Console.
2. Create a new bucket:
- **Bucket Name**: `ddex-messages-prod`
- **Region**: Select your preferred AWS region.
- Enable versioning (optional).
3. Add a folder structure as needed for DDEX uploads.

---

### 2. Create the SQS FIFO Queue
1. Open the AWS SQS Console.
2. Create a new FIFO queue:
- **Name**: `OwenInputQueue.fifo`
- **Message Deduplication**: Enable (using content-based deduplication).
![alt text](image.png)

- **Add the following Access polcicy JSON**. replacing REGION, ACCOUNT_ID and LAMBDA_FUNTION_ARN for your own values.
```json
{
"Version": "2012-10-17",
"Id": "PolicyForOwenInputQueue",
"Statement": [
{
"Sid": "AllowSendMessageFromLambda",
"Effect": "Allow",
"Principal": {
"Service": "lambda.amazonaws.com"
},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:<REGION>:<ACCOUNT_ID>:OwenInputQueue.fifo",
"Condition": {
"ArnEquals": {
"aws:SourceArn": "<LAMBDA_FUNCTION_ARN>"
}
}
}
]
}

```
Example values can be:
`REGION`: us-east-1
`ACCOUNT_ID`: 123456789012
`LAMBDA_FUNCTION_ARN`: "arn:aws:lambda:us-east-1:123456789012:function:sendDdexToSQS"

---

### 3. Create the `sendDdexToSQS` Lambda Function
1. Open the AWS Lambda Console and create a new Lambda function:
- **Name**: `sendDdexToSQS`
- **Runtime**: Python 3.9+.
- **Architcture**: x86_64
- **Permissions**: Attach an IAM role with the following policies:
- `AmazonS3ReadOnlyAccess`
- `AmazonSQSFullAccess`

2. Add the following code to the function:
Replace YOUR_SQS_QUEUE_URL with the URL of the OwenInputQueue.fifo.

Deploy the function
Binary file added owen/aws/image-1.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added owen/aws/image-2.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added owen/aws/image.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
45 changes: 45 additions & 0 deletions owen/aws/lambda/sendDdexToSQS/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# sendDdexToSQS Lambda Function

The `sendDdexToSQS` Lambda function processes batches of files uploaded to an S3 bucket and prepares them for further processing by:
1. Packaging all files (except `BatchComplete*.xml`) in the folder into a single zip file.
2. Sending the zip file to an SQS FIFO queue (`OwenInputQueue.fifo`) for downstream processing.

## Workflow

1. Triggered by an S3 event when a `BatchComplete*.xml` file is uploaded.
2. Identifies the folder containing the `BatchComplete*.xml` file.
3. Lists all files in the folder.
4. Creates a zip archive of the folder contents (excluding the `BatchComplete*.xml` file).
5. Sends the zip file to the SQS FIFO queue with metadata for ordered and deduplicated processing.

## Prerequisites

- **S3 Bucket**: Ensure the bucket exists and is configured to trigger this Lambda on `BatchComplete*.xml` uploads.
- **SQS FIFO Queue**: Create the queue and note its URL.
- **IAM Role**: The Lambda's execution role requires:
- `AmazonS3ReadOnlyAccess`
- `AmazonSQSFullAccess`

## Variables

- `QUEUE_URL`: The URL of the SQS FIFO queue (`OwenInputQueue.fifo`).

## Deployment

1. **Create the Lambda**:
- Set the runtime to Python 3.9+.
- Paste the code into the function editor or deploy via CLI.

2. **Configure the S3 Trigger**:
- Set the bucket to trigger this Lambda on `BatchComplete*.xml` uploads.

3. **Test**:
- Upload files to the S3 bucket, including a `BatchComplete*.xml`.
- Verify that a message with the zip file is sent to the SQS FIFO queue.

## Output

- Sends an SQS message with:
- The S3 bucket name.
- The zip file (encoded as a string).
- Metadata (`MessageGroupId`, `MessageDeduplicationId`) for ordered processing.
61 changes: 61 additions & 0 deletions owen/aws/lambda/sendDdexToSQS/lambda_function.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import boto3
import zipfile
import io
import json

s3 = boto3.client('s3')
sqs = boto3.client('sqs')

def lambda_handler(event, context):
for record in event['Records']:
bucket_name = record['s3']['bucket']['name']
object_key = record['s3']['object']['key']

print(f"Triggered by file: {object_key} in bucket: {bucket_name}")

if "/" in object_key:
batch_folder = "/".join(object_key.split("/")[:-1]) + "/"
else:
batch_folder = ""

print(f"Batch folder: {batch_folder}")

response = s3.list_objects_v2(Bucket=bucket_name, Prefix=batch_folder)
files = response.get('Contents', [])

if not files:
print(f"No files found in folder: {batch_folder}")
return {
"statusCode": 404,
"body": "No files found in folder"
}

zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w') as zip_file:
for file in files:
file_key = file['Key']
if file_key == object_key:
continue
file_obj = s3.get_object(Bucket=bucket_name, Key=file_key)
file_content = file_obj['Body'].read()
zip_file.writestr(file_key.split('/')[-1], file_content)

zip_buffer.seek(0)
zip_binary = zip_buffer.getvalue()

queue_url = "YOUR_SQS_QUEUE_URL" # Replace with the OwenInputQueue.fifo URL
sqs.send_message(
QueueUrl=queue_url,
MessageBody=json.dumps({
"bucket": bucket_name,
"zip_file": zip_binary.hex()
}),
MessageGroupId="BatchProcessing",
MessageDeduplicationId=batch_folder
)
print(f"Zip file sent to SQS successfully for folder: {batch_folder}")

return {
"statusCode": 200,
"body": "Zip file created and sent to SQS"
}

0 comments on commit 6f6698f

Please sign in to comment.