From 626a2f2520f117ef440f9ed2d9a19fceb6e37a27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?R=C3=A9my=20Greinhofer?= Date: Wed, 22 Nov 2023 18:30:39 -0600 Subject: [PATCH] Add city enqueue endpoint (#53) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new endpoint to enqueue a city onto the BNA pipeline queue. The common functions were removed from `lib.rs` and pulled from the bnacore library instead. The staging deployment workflow and the lambda fixtures were adjsuted accordingly. Signed-off-by: Rémy Greinhofer --- .github/workflows/deployment-staging.yml | 1 + Cargo.toml | 3 +- lambdas/Cargo.toml | 22 +++-- lambdas/src/enqueue/post-city.rs | 81 +++++++++++++++++++ lambdas/src/fixtures/post-enqueue-city.json | 63 +++++++++++++++ lambdas/src/lib.rs | 58 +------------ .../src/submissions/post-submissions-city.rs | 1 - 7 files changed, 159 insertions(+), 70 deletions(-) create mode 100644 lambdas/src/enqueue/post-city.rs create mode 100644 lambdas/src/fixtures/post-enqueue-city.json diff --git a/.github/workflows/deployment-staging.yml b/.github/workflows/deployment-staging.yml index d82898c..53c2e8d 100644 --- a/.github/workflows/deployment-staging.yml +++ b/.github/workflows/deployment-staging.yml @@ -39,6 +39,7 @@ jobs: get-bnas-cities get-cities get-cities-bnas + post-enqueue-city post-submissions-city" echo $LAMBDAS \ | xargs -n1 -t \ diff --git a/Cargo.toml b/Cargo.toml index 944de75..403e141 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,5 +28,4 @@ tokio = { version = "1", features = ["full"] } csv = "1.2.1" dotenv = "0.15.0" itertools = "0.12.0" -# bnacore = { path = "/Users/rgreinhofer/projects/PeopleForBikes/brokenspoke/bnacore" } -bnacore = { git = "https://github.com/PeopleForBikes/brokenspoke", rev = "6fa8808" } +bnacore = { git = "https://github.com/PeopleForBikes/brokenspoke", rev = "98f20d7" } diff --git a/lambdas/Cargo.toml b/lambdas/Cargo.toml index 8a78884..9695419 100644 --- a/lambdas/Cargo.toml +++ b/lambdas/Cargo.toml @@ -3,27 +3,19 @@ name = "lambdas" version = "0.1.0" edition = "2021" -# Starting in Rust 1.62 you can use `cargo add` to add dependencies -# to your project. -# -# If you're using an older Rust version, -# download cargo-edit(https://github.com/killercup/cargo-edit#installation) -# to install the `add` subcommand. -# -# Running `cargo add DEPENDENCY_NAME` will -# add the latest version of a dependency to the list, -# and it will keep the alphabetic ordering for you. - [dependencies] +aws-config = "1.0.0" +aws-sdk-sqs = "0.39.0" aws_lambda_events = "0.12.0" +bnacore = { git = "https://github.com/PeopleForBikes/brokenspoke.git", rev = "98f20d7" } dotenv = "0.15.0" entity = { path = "../entity" } -http-serde = "1.1.2" +http-serde = "1.1.3" lambda_http = "0.8" lambda_runtime = "0.8" nom = "7.1.3" once_cell = "1.17.1" -reqwest = { version = "0.11.16", features = [ +reqwest = { version = "0.11.22", features = [ "json", "native-tls-vendored", "rustls", @@ -63,6 +55,10 @@ path = "src/cities/get-cities-bnas.rs" name = "post-submissions-city" path = "src/submissions/post-submissions-city.rs" +[[bin]] +name = "post-enqueue-city" +path = "src/enqueue/post-city.rs" + [dev-dependencies] rstest = "0.18.1" diff --git a/lambdas/src/enqueue/post-city.rs b/lambdas/src/enqueue/post-city.rs new file mode 100644 index 0000000..e3cb089 --- /dev/null +++ b/lambdas/src/enqueue/post-city.rs @@ -0,0 +1,81 @@ +use aws_config::BehaviorVersion; +use aws_sdk_sqs::{self}; +use bnacore::aws::get_aws_parameter; +use dotenv::dotenv; +use lambda_http::{ + http::StatusCode, run, service_fn, Body, Error, IntoResponse, Request, Response, +}; +use lambdas::{get_apigw_request_id, APIError, APIErrorSource, APIErrors}; +use serde::{Deserialize, Serialize}; +use serde_json::json; + +#[derive(Deserialize, Serialize, Debug)] +pub struct EnqueueCity { + pub country: String, + pub city: String, + pub region: String, + pub fips_code: String, +} + +async fn function_handler(event: Request) -> Result, Error> { + dotenv().ok(); + + // Extract and serialize the data. + let apigw_request_id = get_apigw_request_id(&event); + let body = event.body(); + let body_str = std::str::from_utf8(body).expect("invalid utf-8 sequence"); + let enqueued_city = match serde_json::from_str::(body_str) { + Ok(data) => data, + Err(e) => { + let api_error = APIError::new( + apigw_request_id, + StatusCode::BAD_REQUEST, + String::from("Invalid data"), + format!("The following submission is invalid: {body_str}. {e}"), + APIErrorSource::Pointer(event.uri().path().to_string()), + ); + return Ok(APIErrors::new(&[api_error]).into()); + } + }; + + // Prepare the AWS client. + let bna_sqs_queue = get_aws_parameter("BNA_SQS_QUEUE_URL").await?; + let aws_config = aws_config::load_defaults(BehaviorVersion::latest()).await; + let sqs_client = aws_sdk_sqs::Client::new(&aws_config); + + // Enqueue the message. + let _send_message = match sqs_client + .send_message() + .queue_url(bna_sqs_queue) + .message_body(serde_json::to_string(&enqueued_city)?) + .send() + .await + { + Ok(message) => message, + Err(e) => { + let api_error = APIError::new( + apigw_request_id, + StatusCode::BAD_REQUEST, + String::from("Invalid data"), + format!("cannot enqueue the message: {e}"), + APIErrorSource::Pointer(event.uri().path().to_string()), + ); + return Ok(APIErrors::new(&[api_error]).into()); + } + }; + + Ok(json!(enqueued_city).into_response().await) +} + +#[tokio::main] +async fn main() -> Result<(), Error> { + tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + // disable printing the name of the module in every log line. + .with_target(false) + // disabling time is handy because CloudWatch will add the ingestion time. + .without_time() + .init(); + + run(service_fn(function_handler)).await +} diff --git a/lambdas/src/fixtures/post-enqueue-city.json b/lambdas/src/fixtures/post-enqueue-city.json new file mode 100644 index 0000000..93d3da5 --- /dev/null +++ b/lambdas/src/fixtures/post-enqueue-city.json @@ -0,0 +1,63 @@ +{ + "version": "2.0", + "routeKey": "$default", + "rawPath": "/enqueue/city", + "rawQueryString": "", + "cookies": null, + "headers": { + "header1": "value1", + "header2": "value1,value2" + }, + "queryStringParameters": { + "parameter1": "value1,value2", + "parameter2": "value" + }, + "requestContext": { + "accountId": "123456789012", + "apiId": "api-id", + "authentication": { + "clientCert": { + "clientCertPem": "CERT_CONTENT", + "subjectDN": "www.example.com", + "issuerDN": "Example issuer", + "serialNumber": "a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1:a1", + "validity": { + "notBefore": "May 28 12:30:02 2019 GMT", + "notAfter": "Aug 5 09:36:04 2021 GMT" + } + } + }, + "authorizer": { + "jwt": { + "claims": { + "claim1": "value1", + "claim2": "value2" + }, + "scopes": ["scope1", "scope2"] + } + }, + "domainName": "id.execute-api.us-east-1.amazonaws.com", + "domainPrefix": "id", + "http": { + "method": "POST", + "path": "/enqueue/city", + "protocol": "HTTP/1.1", + "sourceIp": "192.0.2.1", + "userAgent": "agent" + }, + "requestId": "id", + "routeKey": "$default", + "stage": "$default", + "time": "12/Mar/2020:19:03:58 +0000", + "timeEpoch": 1583348638390 + }, + "body": "{\"country\": \"usa\",\"city\": \"santa rosa\",\"region\": \"new mexico\",\"fips_code\": \"3570670\"}", + "pathParameters": { + "parameter1": "value1" + }, + "isBase64Encoded": false, + "stageVariables": { + "stageVariable1": "value1", + "stageVariable2": "value2" + } +} diff --git a/lambdas/src/lib.rs b/lambdas/src/lib.rs index 8d0d23e..6afeb83 100644 --- a/lambdas/src/lib.rs +++ b/lambdas/src/lib.rs @@ -1,5 +1,6 @@ pub mod link_header; +use bnacore::aws::get_aws_secrets; use lambda_http::{http::StatusCode, Body, Error, Request, RequestExt, Response}; use sea_orm::{Database, DatabaseConnection, DbErr}; use serde::{Deserialize, Serialize}; @@ -15,40 +16,6 @@ pub const MAX_PAGE_SIZE: u64 = 100; /// Number of items to return per page if no argument was provided. pub const DEFAULT_PAGE_SIZE: u64 = 50; -/// Represents the contents of the encrypted fields SecretString or SecretBinary -/// from the specified version of a secret, whichever contains content. -#[derive(Deserialize)] -#[serde(rename_all = "PascalCase")] -pub struct SecretValue { - /// Amazon Resource Name of the secret. - #[serde(alias = "ARN")] - pub arn: String, - /// Creation date. - pub created_date: String, - /// The friendly name of the secret. - pub name: String, - /// The decrypted secret value, if the secret value was originally provided - /// as binary data in the form of a byte array. The response parameter - /// represents the binary data as a base64-encoded string. - /// - /// If the secret was created by using the Secrets Manager console, or if - /// the secret value was originally provided as a string, then this field - /// is omitted. The secret value appears in SecretString instead. - pub secret_binary: Option, - /// The decrypted secret value, if the secret value was originally provided - /// as a string or through the Secrets Manager console. - /// If this secret was created by using the console, then Secrets Manager - /// stores the information as a JSON structure of key/value pairs. - pub secret_string: String, - /// Unique identifier of the version of the secret. - pub version_id: String, - /// A list of all of the staging labels currently attached to this version - /// of the secret. - pub version_stages: Vec, - /// Metadata. - pub result_metadata: HashMap, -} - /// Returns the database connection. /// /// Look up for the connection string: @@ -65,7 +32,7 @@ pub async fn database_connect(secret_id: Option<&str>) -> Result = serde_json::from_str(&secret_value.secret_string) + let secrets: HashMap = serde_json::from_str(&secret_value) .map_err(|e| DbErr::Custom(format!("Cannot deserialize the cached secret: {e}")))?; match secrets.get(DATABASE_URL_KEY) { Some(v) => v.to_owned(), @@ -80,25 +47,6 @@ pub async fn database_connect(secret_id: Option<&str>) -> Result -pub async fn get_aws_secrets(secret_id: &str) -> Result { - let aws_session_token = - env::var("AWS_SESSION_TOKEN").map_err(|e| format!("Cannot find AWS session token: {e}"))?; - reqwest::Client::new() - .get(format!( - "http://localhost:2773/secretsmanager/get?secretId={secret_id}" - )) - .header("X-Aws-Parameters-Secrets-Token", aws_session_token) - .send() - .await - .map_err(|e| e.to_string())? - .json::() - .await - .map_err(|e| e.to_string()) -} - /// Retrieves the pagination parameters. /// /// If nothing is provided, the first page is returned and will contain up to @@ -362,10 +310,12 @@ pub enum APIErrorSource { Parameter(String), Header(String), } + /// Single API Error object as described in . #[derive(Deserialize, Serialize, Clone)] pub struct APIError { id: Option, + // Cannot use http_serde 2.0.0 until lambda_http upgraded the http crate to 1.0.0. #[serde(with = "http_serde::status_code")] status: StatusCode, title: String, diff --git a/lambdas/src/submissions/post-submissions-city.rs b/lambdas/src/submissions/post-submissions-city.rs index 05935f5..52b7678 100644 --- a/lambdas/src/submissions/post-submissions-city.rs +++ b/lambdas/src/submissions/post-submissions-city.rs @@ -17,7 +17,6 @@ async fn function_handler(event: Request) -> Result, Error> { let apigw_request_id = get_apigw_request_id(&event); let body = event.body(); let body_str = std::str::from_utf8(body).expect("invalid utf-8 sequence"); - dbg!(body_str); let model = match serde_json::from_str::(body_str) { Ok(model) => model, Err(e) => {