Skip to content

Commit

Permalink
Add city enqueue endpoint (#53)
Browse files Browse the repository at this point in the history
Adds a new endpoint to enqueue a city onto the BNA pipeline queue.

The common functions were removed from `lib.rs` and pulled from the bnacore
library instead.

The staging deployment workflow and the lambda fixtures were adjsuted
accordingly.

Signed-off-by: Rémy Greinhofer <[email protected]>
  • Loading branch information
rgreinho authored Nov 23, 2023
1 parent 857a00d commit 626a2f2
Show file tree
Hide file tree
Showing 7 changed files with 159 additions and 70 deletions.
1 change: 1 addition & 0 deletions .github/workflows/deployment-staging.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ jobs:
get-bnas-cities
get-cities
get-cities-bnas
post-enqueue-city
post-submissions-city"
echo $LAMBDAS \
| xargs -n1 -t \
Expand Down
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,5 +28,4 @@ tokio = { version = "1", features = ["full"] }
csv = "1.2.1"
dotenv = "0.15.0"
itertools = "0.12.0"
# bnacore = { path = "/Users/rgreinhofer/projects/PeopleForBikes/brokenspoke/bnacore" }
bnacore = { git = "https://github.com/PeopleForBikes/brokenspoke", rev = "6fa8808" }
bnacore = { git = "https://github.com/PeopleForBikes/brokenspoke", rev = "98f20d7" }
22 changes: 9 additions & 13 deletions lambdas/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,19 @@ name = "lambdas"
version = "0.1.0"
edition = "2021"

# Starting in Rust 1.62 you can use `cargo add` to add dependencies
# to your project.
#
# If you're using an older Rust version,
# download cargo-edit(https://github.com/killercup/cargo-edit#installation)
# to install the `add` subcommand.
#
# Running `cargo add DEPENDENCY_NAME` will
# add the latest version of a dependency to the list,
# and it will keep the alphabetic ordering for you.

[dependencies]
aws-config = "1.0.0"
aws-sdk-sqs = "0.39.0"
aws_lambda_events = "0.12.0"
bnacore = { git = "https://github.com/PeopleForBikes/brokenspoke.git", rev = "98f20d7" }
dotenv = "0.15.0"
entity = { path = "../entity" }
http-serde = "1.1.2"
http-serde = "1.1.3"
lambda_http = "0.8"
lambda_runtime = "0.8"
nom = "7.1.3"
once_cell = "1.17.1"
reqwest = { version = "0.11.16", features = [
reqwest = { version = "0.11.22", features = [
"json",
"native-tls-vendored",
"rustls",
Expand Down Expand Up @@ -63,6 +55,10 @@ path = "src/cities/get-cities-bnas.rs"
name = "post-submissions-city"
path = "src/submissions/post-submissions-city.rs"

[[bin]]
name = "post-enqueue-city"
path = "src/enqueue/post-city.rs"

[dev-dependencies]
rstest = "0.18.1"

Expand Down
81 changes: 81 additions & 0 deletions lambdas/src/enqueue/post-city.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use aws_config::BehaviorVersion;
use aws_sdk_sqs::{self};
use bnacore::aws::get_aws_parameter;
use dotenv::dotenv;
use lambda_http::{
http::StatusCode, run, service_fn, Body, Error, IntoResponse, Request, Response,
};
use lambdas::{get_apigw_request_id, APIError, APIErrorSource, APIErrors};
use serde::{Deserialize, Serialize};
use serde_json::json;

#[derive(Deserialize, Serialize, Debug)]
pub struct EnqueueCity {
pub country: String,
pub city: String,
pub region: String,
pub fips_code: String,
}

async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
dotenv().ok();

// Extract and serialize the data.
let apigw_request_id = get_apigw_request_id(&event);
let body = event.body();
let body_str = std::str::from_utf8(body).expect("invalid utf-8 sequence");
let enqueued_city = match serde_json::from_str::<EnqueueCity>(body_str) {
Ok(data) => data,
Err(e) => {
let api_error = APIError::new(
apigw_request_id,
StatusCode::BAD_REQUEST,
String::from("Invalid data"),
format!("The following submission is invalid: {body_str}. {e}"),
APIErrorSource::Pointer(event.uri().path().to_string()),
);
return Ok(APIErrors::new(&[api_error]).into());
}
};

// Prepare the AWS client.
let bna_sqs_queue = get_aws_parameter("BNA_SQS_QUEUE_URL").await?;
let aws_config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let sqs_client = aws_sdk_sqs::Client::new(&aws_config);

// Enqueue the message.
let _send_message = match sqs_client
.send_message()
.queue_url(bna_sqs_queue)
.message_body(serde_json::to_string(&enqueued_city)?)
.send()
.await
{
Ok(message) => message,
Err(e) => {
let api_error = APIError::new(
apigw_request_id,
StatusCode::BAD_REQUEST,
String::from("Invalid data"),
format!("cannot enqueue the message: {e}"),
APIErrorSource::Pointer(event.uri().path().to_string()),
);
return Ok(APIErrors::new(&[api_error]).into());
}
};

Ok(json!(enqueued_city).into_response().await)
}

#[tokio::main]
async fn main() -> Result<(), Error> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
// disable printing the name of the module in every log line.
.with_target(false)
// disabling time is handy because CloudWatch will add the ingestion time.
.without_time()
.init();

run(service_fn(function_handler)).await
}
63 changes: 63 additions & 0 deletions lambdas/src/fixtures/post-enqueue-city.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"version": "2.0",
"routeKey": "$default",
"rawPath": "/enqueue/city",
"rawQueryString": "",
"cookies": null,
"headers": {
"header1": "value1",
"header2": "value1,value2"
},
"queryStringParameters": {
"parameter1": "value1,value2",
"parameter2": "value"
},
"requestContext": {
"accountId": "123456789012",
"apiId": "api-id",
"authentication": {
"clientCert": {
"clientCertPem": "CERT_CONTENT",
"subjectDN": "www.example.com",
"issuerDN": "Example issuer",
"serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1",
"validity": {
"notBefore": "May 28 12:30:02 2019 GMT",
"notAfter": "Aug 5 09:36:04 2021 GMT"
}
}
},
"authorizer": {
"jwt": {
"claims": {
"claim1": "value1",
"claim2": "value2"
},
"scopes": ["scope1", "scope2"]
}
},
"domainName": "id.execute-api.us-east-1.amazonaws.com",
"domainPrefix": "id",
"http": {
"method": "POST",
"path": "/enqueue/city",
"protocol": "HTTP/1.1",
"sourceIp": "192.0.2.1",
"userAgent": "agent"
},
"requestId": "id",
"routeKey": "$default",
"stage": "$default",
"time": "12/Mar/2020:19:03:58 +0000",
"timeEpoch": 1583348638390
},
"body": "{\"country\": \"usa\",\"city\": \"santa rosa\",\"region\": \"new mexico\",\"fips_code\": \"3570670\"}",
"pathParameters": {
"parameter1": "value1"
},
"isBase64Encoded": false,
"stageVariables": {
"stageVariable1": "value1",
"stageVariable2": "value2"
}
}
58 changes: 4 additions & 54 deletions lambdas/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
pub mod link_header;

use bnacore::aws::get_aws_secrets;
use lambda_http::{http::StatusCode, Body, Error, Request, RequestExt, Response};
use sea_orm::{Database, DatabaseConnection, DbErr};
use serde::{Deserialize, Serialize};
Expand All @@ -15,40 +16,6 @@ pub const MAX_PAGE_SIZE: u64 = 100;
/// Number of items to return per page if no argument was provided.
pub const DEFAULT_PAGE_SIZE: u64 = 50;

/// Represents the contents of the encrypted fields SecretString or SecretBinary
/// from the specified version of a secret, whichever contains content.
#[derive(Deserialize)]
#[serde(rename_all = "PascalCase")]
pub struct SecretValue {
/// Amazon Resource Name of the secret.
#[serde(alias = "ARN")]
pub arn: String,
/// Creation date.
pub created_date: String,
/// The friendly name of the secret.
pub name: String,
/// The decrypted secret value, if the secret value was originally provided
/// as binary data in the form of a byte array. The response parameter
/// represents the binary data as a base64-encoded string.
///
/// If the secret was created by using the Secrets Manager console, or if
/// the secret value was originally provided as a string, then this field
/// is omitted. The secret value appears in SecretString instead.
pub secret_binary: Option<String>,
/// The decrypted secret value, if the secret value was originally provided
/// as a string or through the Secrets Manager console.
/// If this secret was created by using the console, then Secrets Manager
/// stores the information as a JSON structure of key/value pairs.
pub secret_string: String,
/// Unique identifier of the version of the secret.
pub version_id: String,
/// A list of all of the staging labels currently attached to this version
/// of the secret.
pub version_stages: Vec<String>,
/// Metadata.
pub result_metadata: HashMap<String, String>,
}

/// Returns the database connection.
///
/// Look up for the connection string:
Expand All @@ -65,7 +32,7 @@ pub async fn database_connect(secret_id: Option<&str>) -> Result<DatabaseConnect
let secret_value = get_aws_secrets(&v)
.await
.map_err( DbErr::Custom)?;
let secrets: HashMap<String, String> = serde_json::from_str(&secret_value.secret_string)
let secrets: HashMap<String, String> = serde_json::from_str(&secret_value)
.map_err(|e| DbErr::Custom(format!("Cannot deserialize the cached secret: {e}")))?;
match secrets.get(DATABASE_URL_KEY) {
Some(v) => v.to_owned(),
Expand All @@ -80,25 +47,6 @@ pub async fn database_connect(secret_id: Option<&str>) -> Result<DatabaseConnect
Database::connect(database_url).await
}

/// Retrieves a secret from the AWS Secrets Manager using the Lambda caching layer.
///
/// Ref: <https://docs.aws.amazon.com/secretsmanager/latest/userguide/retrieving-secrets_lambda.html>
pub async fn get_aws_secrets(secret_id: &str) -> Result<SecretValue, String> {
let aws_session_token =
env::var("AWS_SESSION_TOKEN").map_err(|e| format!("Cannot find AWS session token: {e}"))?;
reqwest::Client::new()
.get(format!(
"http://localhost:2773/secretsmanager/get?secretId={secret_id}"
))
.header("X-Aws-Parameters-Secrets-Token", aws_session_token)
.send()
.await
.map_err(|e| e.to_string())?
.json::<SecretValue>()
.await
.map_err(|e| e.to_string())
}

/// Retrieves the pagination parameters.
///
/// If nothing is provided, the first page is returned and will contain up to
Expand Down Expand Up @@ -362,10 +310,12 @@ pub enum APIErrorSource {
Parameter(String),
Header(String),
}

/// Single API Error object as described in <https://jsonapi.org/format/#error-objects>.
#[derive(Deserialize, Serialize, Clone)]
pub struct APIError {
id: Option<String>,
// Cannot use http_serde 2.0.0 until lambda_http upgraded the http crate to 1.0.0.
#[serde(with = "http_serde::status_code")]
status: StatusCode,
title: String,
Expand Down
1 change: 0 additions & 1 deletion lambdas/src/submissions/post-submissions-city.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ async fn function_handler(event: Request) -> Result<Response<Body>, Error> {
let apigw_request_id = get_apigw_request_id(&event);
let body = event.body();
let body_str = std::str::from_utf8(body).expect("invalid utf-8 sequence");
dbg!(body_str);
let model = match serde_json::from_str::<submission::Model>(body_str) {
Ok(model) => model,
Err(e) => {
Expand Down

0 comments on commit 626a2f2

Please sign in to comment.