Skip to content

Commit

Permalink
refactor: tidy crates, add plain lambda for calling receive manually
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Sep 18, 2023
1 parent 59f6cd1 commit e629071
Show file tree
Hide file tree
Showing 14 changed files with 145 additions and 156 deletions.
17 changes: 0 additions & 17 deletions lib/workload/stateless/filemanager/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion lib/workload/stateless/filemanager/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ resolver = "2"

members = [
"filemanager",
"filemanager-http-axum",
"filemanager-http-lambda",
"filemanager-ingest-lambda",
]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
use axum::{
routing::{get, post},
Router,
};
use lambda_http::{run, Error};
use filemanager::database::s3::ingester::Ingester;
use filemanager::events::s3::s3::S3;
use filemanager::events::s3::sqs::SQS;
use lambda_http::Error;
use lambda_runtime::{run, service_fn, LambdaEvent};

/// Handle SQS events.
async fn event_handler(_: LambdaEvent<()>) -> Result<(), Error> {
let sqs = SQS::with_default_client().await?;
let events = sqs.receive().await?;

let s3 = S3::with_defaults().await?;
let events = s3.update_events(events).await?;

let mut ingester = Ingester::new_with_defaults().await?;

ingester.ingest_events(events.into()).await?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

// Todo add axum routes here.
todo!();

//run(app).await
run(service_fn(event_handler)).await
}
Original file line number Diff line number Diff line change
@@ -1,44 +1,41 @@
use aws_lambda_events::event::sqs::SqsEventObj;
use aws_lambda_events::sqs::SqsEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};
use serde::{Deserialize, Serialize};

use filemanager::database::s3::ingester::Ingester;
use filemanager::events::s3::{Events, FlatS3EventMessage, FlatS3EventMessages};
use filemanager::events::s3::s3::S3;

/// This is the main body for the function.
/// You can use the data sent into SQS here.
async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let events = event.payload.records.into_iter().filter_map(|event| {
event.body.map(|body| {
let body: FlatS3EventMessages = serde_json::from_str(&body).unwrap();
body
use filemanager::events::s3::FlatS3EventMessages;

/// Handle SQS events.
async fn event_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
let events: FlatS3EventMessages = event
.payload
.records
.into_iter()
.filter_map(|event| {
event.body.map(|body| {
let body: FlatS3EventMessages = serde_json::from_str(&body).unwrap();
body
})
})
}).collect::<Vec<FlatS3EventMessages>>();
.collect::<Vec<FlatS3EventMessages>>()
.into();

let e = events.into_iter().next().unwrap().body;
let s3 = S3::with_defaults().await?;
let e = s3.update_events(e).await?;

let e = Events::from(e);
let events = s3.update_events(events).await?;

let mut ingester = Ingester::new_with_defaults().await?;

ingester.ingest_events(e).await?;
ingester.ingest_events(events.into()).await?;

Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
// required to enable CloudWatch error logging by the runtime
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

run(service_fn(function_handler)).await
run(service_fn(event_handler)).await
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,9 @@
pub mod s3;
use sqlx::PgPool;

use crate::error::Error::DbClientError;
use crate::error::Result;
use aws_sdk_s3::types::StorageClass;
use chrono::{DateTime, NaiveDateTime, Utc};
use serde::{Deserialize, Serialize};
use sqlx::PgPool;
use sqlx::FromRow;
use utoipa::{IntoParams, ToSchema};

pub mod s3;

#[derive(Debug)]
pub struct DbClient {
Expand All @@ -23,11 +19,11 @@ impl DbClient {
let url = std::env::var("DATABASE_URL").map_err(|err| DbClientError(err.to_string()))?;

Ok(Self {
pool: PgPool::connect(&url).await?
pool: PgPool::connect(&url).await?,
})
}

pub fn pool(&self) -> &PgPool {
&self.pool
}
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
use aws_sdk_s3::operation::head_object::HeadObjectOutput;
use chrono::DateTime;
use chrono::Utc;
use futures::future::join_all;
use futures::StreamExt;
use sqlx::{Executor, Postgres, query, query_file, QueryBuilder};
use uuid::Uuid;
use sqlx::query_file;

use crate::database::DbClient;
use crate::events::s3::{Events, FlatS3EventMessage, FlatS3EventMessages, StorageClass, TransposedS3EventMessages};
use crate::events::s3::s3::S3;
use crate::error::Result;
use crate::events::s3::s3::S3;
use crate::events::s3::{Events, TransposedS3EventMessages};

/// An ingester for S3 events.
#[derive(Debug)]
pub struct Ingester {
db: DbClient,
s3: S3
s3: S3,
}

impl Ingester {
Expand All @@ -25,7 +20,7 @@ impl Ingester {
pub async fn new_with_defaults() -> Result<Self> {
Ok(Self {
db: DbClient::new_with_defaults().await?,
s3: S3::with_defaults().await?
s3: S3::with_defaults().await?,
})
}

Expand All @@ -49,7 +44,8 @@ impl Ingester {
..
} = object_created;

query_file!("../database/queries/ingester/insert_objects.sql",
query_file!(
"../database/queries/ingester/insert_objects.sql",
&object_ids,
&buckets,
&keys,
Expand All @@ -58,12 +54,17 @@ impl Ingester {
&event_times,
&last_modified_dates as &[Option<DateTime<Utc>>],
&portal_run_ids
).execute(&self.db.pool).await?;
)
.execute(&self.db.pool)
.await?;

query_file!("../database/queries/ingester/aws/insert_s3_objects.sql",
query_file!(
"../database/queries/ingester/aws/insert_s3_objects.sql",
&object_ids,
&storage_classes as &[Option<StorageClass>]
).execute(&self.db.pool).await?;
)
.execute(&self.db.pool)
.await?;

let TransposedS3EventMessages {
event_times,
Expand All @@ -72,12 +73,15 @@ impl Ingester {
..
} = object_removed;

query_file!("../database/queries/ingester/update_deleted.sql",
query_file!(
"../database/queries/ingester/update_deleted.sql",
&keys,
&buckets,
&event_times
).execute(&self.db.pool).await?;
)
.execute(&self.db.pool)
.await?;

Ok(())
}
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
pub mod ingester;

use aws_sdk_s3::types::StorageClass;

pub mod ingester;

/// An S3 object which matches the s3 object schema.
#[derive(Debug, Clone)]
pub struct CloudObject {
Expand Down
3 changes: 2 additions & 1 deletion lib/workload/stateless/filemanager/filemanager/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use sqlx::migrate::MigrateError;
use std::result;

use sqlx::migrate::MigrateError;
use thiserror::Error;

pub type Result<T> = result::Result<T, Error>;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
//! This module handles converting storage events into database objects
//!
pub mod s3;

use crate::events::s3::Events;
use crate::error::Result;
use crate::events::s3::Events;

pub mod s3;

/// This trait processes raw events into a common type that can easily be consumed by the database.
pub trait Collect {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use crate::events::s3::s3::S3;
use crate::error::Result;
use crate::events::s3::sqs::SQS;
use crate::events::{Collect, EventType};
use crate::events::s3::s3::S3;
use crate::events::s3::{Events, FlatS3EventMessages};
use crate::events::{Collect, EventType};

#[derive(Debug)]
pub struct Collecter {
Expand All @@ -18,7 +17,7 @@ impl Collecter {
pub async fn with_defaults(raw_events: FlatS3EventMessages) -> Result<Self> {
Ok(Self {
s3: S3::with_defaults().await?,
raw_events
raw_events,
})
}
}
Expand All @@ -27,4 +26,4 @@ impl Collect for Collecter {
fn collect(self) -> Result<EventType> {
Ok(EventType::S3(Events::from(self.raw_events)))
}
}
}
Loading

0 comments on commit e629071

Please sign in to comment.