Skip to content

Commit

Permalink
feat: add localstack deploy and working example
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Sep 19, 2023
1 parent 463e0c0 commit ae36a31
Show file tree
Hide file tree
Showing 26 changed files with 146 additions and 267 deletions.
3 changes: 0 additions & 3 deletions lib/workload/stateless/filemanager/.env.example

This file was deleted.

1 change: 1 addition & 0 deletions lib/workload/stateless/filemanager/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
target/
*.swp
/volume/
2 changes: 2 additions & 0 deletions lib/workload/stateless/filemanager/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

52 changes: 15 additions & 37 deletions lib/workload/stateless/filemanager/README.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# Filemanager

The filemanager ingests events from cloud storage like S3 and maintains a queryable table of objects.

# Quickstart

For development edit/compile looping:
Expand All @@ -12,54 +16,28 @@ $ cargo watch -c -w src -x run
2023-06-13T00:56:41.621002Z INFO filemanager: listening on 0.0.0.0:8080
```

Then:

```
$ curl localhost:8080/file/moo.bam
```

And to access the builtin Swagger playground, visit http://localhost:8080/swagger-ui/ on your browser.

## Development

For local(stack) testing and development:
For localstack testing and development:

```sh
pip install awscli-local
docker compose up
```

That allows one to interact with the S3 bucket events like so:
Then deploy the cdk to localstack:

```sh
$ ./lib/workload/stateless/filemanager/localstack-s3-events-to-sqs.sh # Setup up the whole S3-SQS event registering
$ export AWS_REGION=ap-southeast-2
$ awslocal sqs list-queues
{
"QueueUrls": [
"http://localhost:4566/000000000000/filemanager_s3_events"
]
}
$ awslocal s3 cp ~/tmp/xilinx.jed s3://filemanager/
upload: ../../../tmp/xilinx.jed to s3://filemanager/xilinx.jed

% awslocal sqs receive-message --queue-url=http://localhost:4566/000000000000/filemanager_s3_events
{
"Messages": [
{
"MessageId": "b4c9f3d9-b8aa-4193-958c-fa724ae77b50",
"ReceiptHandle": "YWVmMWI3OWUtNTk2Ni00NmYwLTljODYtN2YyNWRhZWE5OWFiIGFybjphd3M6c3FzOmFwLXNvdXRoZWFzdC0yOjAwMDAwMDAwMDAwMDpmaWxlbWFuYWdlcl9zM19ldmVudHMgYjRjOWYzZDktYjhhYS00MTkzLTk1OGMtZmE3MjRhZTc3YjUwIDE2ODc3NDMyMTYuNTk0MjUwMg==",
"MD5OfBody": "4dba243e4db081027122f9597a0422f7",
"Body": "{\"Records\": [{\"eventVersion\": \"2.1\", \"eventSource\": \"aws:s3\", \"awsRegion\": \"ap-southeast-2\", \"eventTime\": \"2023-06-26T01:33:28.384Z\", \"eventName\": \"ObjectCreated:Put\", \"userIdentity\": {\"principalId\": \"AIDAJDPLRKLG7UEXAMPLE\"}, \"requestParameters\": {\"sourceIPAddress\": \"127.0.0.1\"}, \"responseElements\": {\"x-amz-request-id\": \"0df21e3c\", \"x-amz-id-2\": \"eftixk72aD6Ap51TnqcoF8eFidJG9Z/2\"}, \"s3\": {\"s3SchemaVersion\": \"1.0\", \"configurationId\": \"8ff04f8b-eb23-445c-af74-f350ffbf9b6c\", \"bucket\": {\"name\": \"filemanager\", \"ownerIdentity\": {\"principalId\": \"A3NL1KOZZKExample\"}, \"arn\": \"arn:aws:s3:::filemanager\"}, \"object\": {\"key\": \"lolo\", \"size\": 230764, \"eTag\": \"\\\"6548a5cd43a78152c559d81f37488786\\\"\", \"versionId\": null, \"sequencer\": \"0055AED6DCD90281E5\"}}}]}"
}
]
}
cd deploy
npx cdklocal bootstrap
npx cdklocal deploy
```

To purge the localstack s3 bucket and SQS queues:
Which allows creating events that are ingested:

```bash
% awslocal s3 rm --recursive s3://filemanager
% awslocal sqs purge-queue --queue-url http://localhost:4566/000000000000/filemanager_s3_events
```sh
awslocal s3api put-object --bucket filemanager-test-ingest --key test

/bin/bash aws-get-filemanager-logs.sh -c awslocal > logs.txt
```

### Database
Expand Down
74 changes: 0 additions & 74 deletions lib/workload/stateless/filemanager/database/database.sh

This file was deleted.

3 changes: 0 additions & 3 deletions lib/workload/stateless/filemanager/database/init_database.sh

This file was deleted.

6 changes: 0 additions & 6 deletions lib/workload/stateless/filemanager/database/migrate.sh

This file was deleted.

4 changes: 0 additions & 4 deletions lib/workload/stateless/filemanager/database/reset_database.sh

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,15 @@
import "source-map-support/register";
import * as cdk from "aws-cdk-lib";
import {FilemanagerStack} from "../stack/filemanager_stack";
import {config} from "dotenv";

config({ path: '../.env' });

export const STACK_NAME = "FilemanagerLocalStack";
const STACK_DESCRIPTION = "A stack deploying filemanager to dev.";

const app = new cdk.App();
new FilemanagerStack(app, STACK_NAME, {
database_url: process.env.DATABASE_URL ?? throwExpression("DATABASE_URL should not be undefined for localstack development"),
endpoint_url: process.env.ENDPOINT_URL ?? throwExpression("ENDPOINT_URL should not be undefined for localstack development"),
database_url: "postgresql://filemanager:filemanager@db:5432/filemanager",
endpoint_url: "http://localstack:4566",
force_path_style: true,
stack_name: STACK_NAME
}, {
stackName: STACK_NAME,
Expand All @@ -24,8 +22,4 @@ new FilemanagerStack(app, STACK_NAME, {
env: {
account: "000000000000",
},
});

function throwExpression(errorMessage: string): never {
throw new Error(errorMessage);
}
});

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
npx cdklocal bootstrap
npx cdklocal deploy

awslocal s3api put-object --bucket filemanager-test-ingest --key test

/bin/bash aws-get-filemanager-logs.sh -c awslocal > logs.txt
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {CfnBucket} from "aws-cdk-lib/aws-s3";
interface Settings {
database_url: string,
endpoint_url: string,
force_path_style: boolean,
stack_name: string,
}

Expand Down Expand Up @@ -85,13 +86,13 @@ export class FilemanagerStack extends Stack {
environment: {
DATABASE_URL: settings.database_url,
ENDPOINT_URL: settings.endpoint_url,
FORCE_PATH_STYLE: settings.force_path_style.toString(),
SQS_QUEUE_URL: queue.queueUrl,
RUST_LOG: "info,filemanager=trace,filemanager_http_lambda=trace",
RUST_LOG: "info,filemanager_ingest_lambda=trace,filemanager=trace",
},
buildEnvironment: {
// Todo get this to build debug rather than release for local dev.
RUSTFLAGS: "-C target-cpu=neoverse-n1",
CARGO_PROFILE_RELEASE_LTO: "true",
CARGO_PROFILE_RELEASE_CODEGEN_UNITS: "1",
},
architecture: Architecture.ARM_64,
role: lambdaRole,
Expand Down
18 changes: 14 additions & 4 deletions lib/workload/stateless/filemanager/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
# NOTE:
# If you update image version, make sure to update in docker-compose.ci.yml as well

version: '3.1'

services:
Expand All @@ -13,4 +10,17 @@ services:
- POSTGRES_USER=filemanager
- POSTGRES_PASSWORD=filemanager
ports:
- "5432:5432"
- "5432:5432"

localstack:
container_name: localstack
image: localstack/localstack
ports:
- "127.0.0.1:4566:4566" # LocalStack Gateway
- "127.0.0.1:4510-4559:4510-4559" # external services port range
environment:
- DEBUG=${DEBUG-}
- DOCKER_HOST=unix:///var/run/docker.sock
volumes:
- "${LOCALSTACK_VOLUME_DIR:-./volume}:/var/lib/localstack"
- "/var/run/docker.sock:/var/run/docker.sock"
Original file line number Diff line number Diff line change
@@ -1,30 +1,35 @@
use filemanager::database::s3::ingester::Ingester;
use filemanager::events::s3::s3::S3;
use filemanager::events::Collect;
use filemanager::events::s3::sqs::SQS;
use lambda_http::Error;
use lambda_runtime::{run, service_fn, LambdaEvent};
use tracing_subscriber::{EnvFilter, fmt};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use filemanager::database::Ingest;
use filemanager::events::s3::collect::Collecter;

/// Handle SQS events.
async fn event_handler(_: LambdaEvent<()>) -> Result<(), Error> {
let sqs = SQS::with_default_client().await?;
let events = sqs.receive().await?;

let s3 = S3::with_defaults().await?;
let events = s3.update_events(events).await?;
let events = Collecter::with_defaults(events).await?.collect().await?;

let mut ingester = Ingester::new_with_defaults().await?;

ingester.ingest_events(events.into()).await?;
ingester.ingest(events).await?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.json()
.with_target(false)
.without_time()
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));

tracing_subscriber::registry()
.with(fmt::layer().json().without_time())
.with(env_filter)
.init();

run(service_fn(event_handler)).await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,7 @@ tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] }

filemanager = { path = "../filemanager" }
serde_json = "1.0.105"
serde_json = "1.0.105"

aws-config = "0.56"
aws-sdk-sts = "0.30"
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
use aws_lambda_events::sqs::SqsEvent;
use aws_sdk_sts::config;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use tracing::{info, trace};
use tracing_subscriber::{EnvFilter, fmt};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
use filemanager::database::Ingest;

use filemanager::database::s3::ingester::Ingester;
use filemanager::events::Collect;
use filemanager::events::s3::collect::Collecter;
use filemanager::events::s3::s3::S3;
use filemanager::events::s3::FlatS3EventMessages;

Expand All @@ -25,25 +32,24 @@ async fn event_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {

trace!("flattened events: {:?}", events);

let s3 = S3::with_defaults().await?;
let events = s3.update_events(events).await?;
let events = Collecter::with_defaults(events).await?.collect().await?;

let mut ingester = Ingester::new_with_defaults().await?;

let events = events.into();
trace!("ingesting events: {:?}", events);

ingester.ingest_events(events).await?;
let mut ingester = Ingester::new_with_defaults().await?;
trace!("ingester: {:?}", ingester);
ingester.ingest(events).await?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.json()
.with_target(false)
.without_time()
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));

tracing_subscriber::registry()
.with(fmt::layer().json().without_time())
.with(env_filter)
.init();

run(service_fn(event_handler)).await
Expand Down
Loading

0 comments on commit ae36a31

Please sign in to comment.