Skip to content

Commit

Permalink
docs: add documentation comments and update README.md. /cc @brainstorm.
Browse files Browse the repository at this point in the history
  • Loading branch information
mmalenic committed Oct 23, 2023
1 parent b047cff commit 927367a
Show file tree
Hide file tree
Showing 13 changed files with 52 additions and 9 deletions.
21 changes: 17 additions & 4 deletions lib/workload/stateless/filemanager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,24 @@ Start the postgres database and ensure that an `.env` file is set containing the
docker compose up
```

For development of the rust workspace:
The filemanager uses sqlx to check if queries succeed against a database at compile time.
A `.env` file ensures that the sqlx code can check queries at compile time by providing a `DATABASE_URL`.

Filemanager uses docker to run a postgres database to track objects. This means that sqlx connects to the postgres server
running inside the docker compose container. If there are additional postgres installations locally (outside of docker),
this might interfere and complain about non-existing roles and users.

For development of the rust workspace, build manually:

```sh
cargo build --all-targets --all-features
```

Or watch and automatically recompile changes:

```sh
$ cargo install cargo-watch # if not installed previously
$ cargo watch -c -w src -x run
cargo install cargo-watch # if not installed previously
cargo watch -c
```

Test with:
Expand Down Expand Up @@ -68,7 +81,7 @@ npx cdklocal deploy

This allows creating events that are ingested.

First, push an object (in order to create a log group):q:
First, push an object (in order to create a log group):
```sh
awslocal s3api put-object --bucket filemanager-test-ingest --key test
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ create table object (
-- When this object was deleted, a null value means that the object has not yet been deleted.
deleted_date timestamptz default null,
-- The date of the object and its id combined.
portal_run_id varchar(255) not null
portal_run_id varchar(255) not null,
-- provenance - history of all objects and how they move?
);
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as lambdaDestinations from 'aws-cdk-lib/aws-lambda-destinations';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';

/**
* Common settings for the filemanager stack.
*/
interface Settings {
database_url: string;
endpoint_url: string;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ use filemanager::events::s3::collect::Collecter;
use filemanager::events::s3::sqs_client::SQS;
use filemanager::events::Collect;

/// Handle SQS events.
/// Handle SQS events by manually calling the SQS receive function. This is meant
/// to be run through something like API gateway to manually invoke ingestion.
async fn event_handler(_: LambdaEvent<()>) -> Result<(), Error> {
let sqs = SQS::with_default_client().await?;
let events = sqs.receive().await?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use filemanager::events::s3::collect::Collecter;
use filemanager::events::s3::FlatS3EventMessages;
use filemanager::events::Collect;

/// Handle SQS events.
/// Handle SQS events that are passed directly to a lambda function.
async fn event_handler(event: LambdaEvent<SqsEvent>) -> Result<(), Error> {
trace!("received event: {:?}", event);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@ use crate::events::EventType;

pub mod s3;

/// A database client handles database interaction.
#[derive(Debug)]
pub struct DbClient {
pool: PgPool,
}

impl DbClient {
/// Create a database from an existing pool.
pub fn new(pool: PgPool) -> Self {
Self { pool }
}

/// Create a database with default DATABASE_URL connection.
pub async fn new_with_defaults() -> Result<Self> {
let url = std::env::var("DATABASE_URL").map_err(|err| DbClientError(err.to_string()))?;

Expand All @@ -25,6 +28,7 @@ impl DbClient {
})
}

/// Get the database pool.
pub fn pool(&self) -> &PgPool {
&self.pool
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ pub struct Ingester {
}

impl Ingester {
/// Create a new ingester.
pub fn new(db: DbClient) -> Self {
Self { db }
}

/// Create a new ingester with a default database client.
pub async fn new_with_defaults() -> Result<Self> {
Ok(Self {
db: DbClient::new_with_defaults().await?,
})
}

/// Ingest the events into the database by calling the insert and update queries.
pub async fn ingest_s3_events(&mut self, events: Events) -> Result<()> {
let Events {
object_created,
Expand Down
3 changes: 3 additions & 0 deletions lib/workload/stateless/filemanager/filemanager/src/env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@ use std::{
use dotenvy;
use tracing::{error, info};

/// Controls environment.
#[derive(PartialEq)]
pub enum AppEnv {
/// Dev environment loads config from dotenv.
Dev,
/// Prod environment doesn't load local dotenv config.
Prod,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use thiserror::Error;

pub type Result<T> = result::Result<T, Error>;

/// Error types for the filemanager.
#[derive(Error, Debug)]
pub enum Error {
/// File not found by id.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,20 @@ use crate::events::s3::s3_client::S3;
use crate::events::s3::{Events, FlatS3EventMessages};
use crate::events::{Collect, EventType};

/// Collect raw events into the processed form which the database module accepts.
#[derive(Debug)]
pub struct Collecter {
s3: S3,
raw_events: FlatS3EventMessages,
}

impl Collecter {
/// Create a new collector.
pub fn new(s3: S3, raw_events: FlatS3EventMessages) -> Self {
Self { s3, raw_events }
}

/// Create a new collector with a default S3 client.
pub async fn with_defaults(raw_events: FlatS3EventMessages) -> Result<Self> {
Ok(Self {
s3: S3::with_defaults().await?,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ pub mod collect;
pub mod s3_client;
pub mod sqs_client;

/// A wrapper around AWS storage types with sqlx support.
#[derive(Debug, Eq, PartialEq, sqlx::Type)]
#[sqlx(type_name = "storage_class")]
pub enum StorageClass {
Expand All @@ -36,6 +37,7 @@ impl PgHasArrayType for StorageClass {
}

impl StorageClass {
/// Convert from the AWS storage class type to the filemanager storage class.
pub fn from_aws(storage_class: AwsStorageClass) -> Option<Self> {
match storage_class {
AwsStorageClass::DeepArchive => Some(Self::DeepArchive),
Expand Down Expand Up @@ -164,9 +166,9 @@ impl From<FlatS3EventMessages> for Events {
}
}

/// Flattened AWS S3 events
#[derive(Debug, Deserialize, Eq, PartialEq, Default)]
#[serde(try_from = "S3EventMessage")]
/// Flattened AWS S3 events
pub struct FlatS3EventMessages(pub Vec<FlatS3EventMessage>);

impl FlatS3EventMessages {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,19 @@ use crate::error::Error::S3Error;
use crate::error::Result;
use crate::events::s3::{FlatS3EventMessage, FlatS3EventMessages, StorageClass};

/// A wrapper around an s3 client.
#[derive(Debug)]
pub struct S3 {
s3_client: Client,
}

impl S3 {
/// Create a new S3 client wrapper.
pub fn new(s3_client: Client) -> Self {
Self { s3_client }
}

/// Create with a default S3 client.
pub async fn with_defaults() -> Result<Self> {
let config = aws_config::from_env().load().await;
let mut config = config::Builder::from(&config);
Expand Down Expand Up @@ -68,6 +71,7 @@ impl S3 {
}
}

/// Converts an AWS datetime to a standard database format.
pub fn convert_datetime(
datetime: Option<aws_sdk_s3::primitives::DateTime>,
) -> Option<DateTime<Utc>> {
Expand All @@ -79,6 +83,7 @@ impl S3 {
}
}

/// Process events and add header and datetime fields.
pub async fn update_events(&self, events: FlatS3EventMessages) -> Result<FlatS3EventMessages> {
Ok(FlatS3EventMessages(
join_all(events.into_inner().into_iter().map(|mut event| async move {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,24 @@ use crate::error::Error::{DbClientError, DeserializeError, SQSReceiveError};
use crate::error::Result;
use crate::events::s3::FlatS3EventMessages;

/// A wrapper around an SQS client.
#[derive(Debug)]
pub struct SQS {
sqs_client: Client,
sqs_url: String,
}

impl SQS {
/// Create a new SQS client wrapper.
pub fn new(sqs_client: Client, sqs_url: String) -> Self {
Self {
sqs_client,
sqs_url,
}
}


/// Create with a default SQS client.
pub async fn with_default_client() -> Result<Self> {
let config = aws_config::from_env().load().await;
let mut config = config::Builder::from(&config);
Expand All @@ -37,7 +41,7 @@ impl SQS {
})
}

// TODO: Two possible event types, should be handled differently: PUT and DELETE
/// Manually call the receive function to retrieve events from the SQS queue.
pub async fn receive(&self) -> Result<FlatS3EventMessages> {
let rcv_message_output = self
.sqs_client
Expand Down

0 comments on commit 927367a

Please sign in to comment.