diff --git a/.env b/.env index 8d92c3b..cf27251 100644 --- a/.env +++ b/.env @@ -1,4 +1,4 @@ BEAM_URL=http://localhost:8081 BEAM_ID=app1.proxy1.broker BEAM_SECRET=App1Secret -TOKEN_MANAGER_DB_URL=./file.db +TOKEN_MANAGER_DB_PATH=./file.db diff --git a/Cargo.toml b/Cargo.toml index 8ec4694..2f202cc 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,21 +7,27 @@ license = "Apache-2.0" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -axum = "0.6" -tokio = { version = "1.33.0", features = ["full", "macros"] } -reqwest = { version = "0.11", default_features = false, features = ["json", "default-tls", "stream"] } +axum = "0.7" +tokio = { version = "1", features = ["full", "macros"] } +reqwest = { version = "0.12", default_features = false, features = ["json", "default-tls", "stream"] } serde_json = "1.0" serde = { version = "1", features = ["derive"] } anyhow = { version = "1.0", default-features = false } chrono = "0.4" +uuid = { version = "1.7.0", features = ["v4"] } # Db diesel = { version = "2.1.4", features = ["sqlite", "r2d2"] } diesel_migrations = { version = "2.1.0", features = ["sqlite"] } -libsqlite3-sys = { version = "0.27.0", features = ["bundled"] } +libsqlite3-sys = { version = "0.30", features = ["bundled"] } # Beam beam-lib = { git = "https://github.com/samply/beam", branch = "develop", features = ["http-util"] } async-sse = "5.1.0" futures-util = { version = "0.3", features = ["io"] } +#encrypt +aes = "0.8" +ctr = "0.9" +cipher = "0.4" +base64 = "0.22" # Logging tracing = { version = "0.1" } diff --git a/data/token_manager b/data/token_manager new file mode 100644 index 0000000..dcc107e Binary files /dev/null and b/data/token_manager differ diff --git a/migrations/2023-11-21-192332_token_manager/up.sql b/migrations/2023-11-21-192332_token_manager/up.sql index 1d9a8a0..d5e60ed 100644 --- a/migrations/2023-11-21-192332_token_manager/up.sql +++ b/migrations/2023-11-21-192332_token_manager/up.sql @@ -2,9 +2,12 @@ CREATE TABLE tokens ( id INTEGER PRIMARY KEY AUTOINCREMENT, + token_name TEXT NOT NULL, token TEXT NOT NULL, project_id TEXT NOT NULL, bk TEXT NOT NULL, - status TEXT NOT NULL, - user_id TEXT NOT NULL + token_status TEXT NOT NULL, + project_status TEXT NOT NULL, + user_id TEXT NOT NULL, + token_created_at TEXT NOT NULL ) \ No newline at end of file diff --git a/migrations/2023-11-30-093426_add_create_at_to_tokens/down.sql b/migrations/2023-11-30-093426_add_create_at_to_tokens/down.sql deleted file mode 100644 index 93ce8bb..0000000 --- a/migrations/2023-11-30-093426_add_create_at_to_tokens/down.sql +++ /dev/null @@ -1,3 +0,0 @@ --- This file should undo anything in `up.sql` - -DROP TABLE tokens \ No newline at end of file diff --git a/migrations/2023-11-30-093426_add_create_at_to_tokens/up.sql b/migrations/2023-11-30-093426_add_create_at_to_tokens/up.sql deleted file mode 100644 index a88624b..0000000 --- a/migrations/2023-11-30-093426_add_create_at_to_tokens/up.sql +++ /dev/null @@ -1,3 +0,0 @@ --- Your SQL goes here - -ALTER TABLE tokens ADD COLUMN created_at TEXT NOT NULL; diff --git a/src/config.rs b/src/config.rs index 8975beb..e0db725 100644 --- a/src/config.rs +++ b/src/config.rs @@ -2,7 +2,7 @@ use beam_lib::{AppId, BeamClient}; use clap::Parser; use once_cell::sync::Lazy; use reqwest::Url; -use std::{net::SocketAddr, convert::Infallible}; +use std::{convert::Infallible, net::SocketAddr}; pub(crate) static CONFIG: Lazy = Lazy::new(Config::parse); @@ -12,7 +12,7 @@ pub struct Config { pub addr: SocketAddr, /// Url of the local beam proxy which is required to have sockets enabled - #[clap(long, env, default_value = "http://beam-proxy:8081")] + #[clap(long, env)] pub beam_url: Url, /// Beam api key @@ -26,12 +26,20 @@ pub struct Config { #[clap(long, env, value_parser=|id: &str| Ok::<_, Infallible>(AppId::new_unchecked(id)))] pub beam_id: AppId, - #[clap(long, env)] - pub token_manager_db_url: String, + #[clap(long, env, default_value = "./file.db ")] + pub token_manager_db_path: String, + + #[clap(env, default_value = "0123456789abcdef0123456789ABCDEF")] + pub token_encrypt_key: String, + + #[clap(long, env, default_value = "info")] + pub rust_log: String, } -pub static BEAM_CLIENT: Lazy = Lazy::new(|| BeamClient::new( - &CONFIG.beam_id, - &CONFIG.beam_secret, - CONFIG.beam_url.clone() -)); +pub static BEAM_CLIENT: Lazy = Lazy::new(|| { + BeamClient::new( + &CONFIG.beam_id, + &CONFIG.beam_secret, + CONFIG.beam_url.clone(), + ) +}); diff --git a/src/db.rs b/src/db.rs index 1f031b3..e0a01c7 100644 --- a/src/db.rs +++ b/src/db.rs @@ -1,17 +1,35 @@ -use crate::config::CONFIG; -use crate::models::{NewToken, ScriptParams, TokenManager}; -use axum::{async_trait, extract::{FromRef, FromRequestParts}, http::{request::Parts, StatusCode}, Json}; +use anyhow::Result; +use axum::{ + async_trait, + extract::{FromRef, FromRequestParts}, + http::{request::Parts, StatusCode}, + Json, +}; use diesel::prelude::*; use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; +use diesel::result::Error; use diesel_migrations::{embed_migrations, EmbeddedMigrations, MigrationHarness}; use serde_json::json; -use tracing::{error, warn, info}; +use std::collections::HashSet; +use tracing::{error, info, warn}; + +use crate::config::CONFIG; +use crate::enums::{OpalProjectStatus, OpalTokenStatus}; +use crate::handlers::{ + check_project_status_request, check_token_status_request, fetch_project_tables_names_request, +}; +use crate::models::{ + NewToken, ProjectQueryParams, TokenManager, TokenParams, TokenStatus, TokensQueryParams, +}; +use crate::schema::tokens; +use crate::schema::tokens::dsl::*; +use crate::utils::{decrypt_data, generate_r_script}; const MIGRATIONS: EmbeddedMigrations = embed_migrations!(); pub fn setup_db() -> anyhow::Result>> { let pool = Pool::new(ConnectionManager::::new( - &CONFIG.token_manager_db_url, + &CONFIG.token_manager_db_path, ))?; info!("Running migrations"); pool.get()?.run_pending_migrations(MIGRATIONS).unwrap(); @@ -33,21 +51,21 @@ where async fn from_request_parts(_parts: &mut Parts, state: &S) -> Result { let pool: Pool> = FromRef::from_ref(state); - pool.get().map(Self).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) + pool.get() + .map(Self) + .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR) } } impl Db { - pub fn save_token_db(&mut self, token: NewToken) { - use crate::schema::tokens; - + pub fn save_token_db(&mut self, new_token: NewToken) { match diesel::insert_into(tokens::table) - .values(&token) + .values(&new_token) .on_conflict_do_nothing() .execute(&mut self.0) { Ok(_) => { - info!("New Token Saved in DB"); + info!("Token Saved in DB for user: {} in BK: {}", new_token.user_id, new_token.bk); } Err(error) => { warn!("Error connecting to {}", error); @@ -55,103 +73,321 @@ impl Db { } } - pub async fn check_project_status( + pub fn update_token_db(&mut self, token_update: NewToken) { + let maybe_last_id = tokens + .filter( + user_id + .eq(&token_update.user_id) + .and(project_id.eq(&token_update.project_id)) + .and(bk.eq(&token_update.bk)), + ) + .select(id) + .order(id.desc()) + .first::(&mut self.0) + .optional(); + + if let Ok(Some(last_id)) = maybe_last_id { + let target = tokens.filter(id.eq(last_id)); + match diesel::update(target) + .set(( + token.eq(&token_update.token), + token_status.eq("UPDATED"), + token_created_at.eq(&token_update.token_created_at), + )) + .execute(&mut self.0) + { + Ok(_) => info!("Token Updated in DB for user: {} in BK: {}", token_update.user_id, token_update.bk), + Err(error) => warn!("Error updating token: {}", error), + } + } else if let Err(error) = maybe_last_id { + warn!("Error finding last token record: {}", error); + } + } + + pub fn update_token_status_db(&mut self, token_update: TokenStatus) { + let target = tokens.filter( + user_id + .eq(&token_update.user_id) + .and(project_id.eq(&token_update.project_id)) + .and(bk.eq(&token_update.bk)), + ); + + match diesel::update(target) + .set((token_status.eq(token_update.token_status),)) + .execute(&mut self.0) + { + Ok(_) => { + info!("Token status updated in DB for user: {} in BK: {}", token_update.user_id, token_update.bk); + } + Err(error) => { + warn!("Error updating token status: {}", error); + } + } + } + + pub fn delete_project_db(&mut self, project_params: &ProjectQueryParams,) { + let target = tokens.filter(project_id.eq(&project_params.project_id)); + + match diesel::delete(target).execute(&mut self.0) { + Ok(_) => { + info!("Project and Token deleted from DB for project: {} in BK: {}", &project_params.project_id, &project_params.bk); + } + Err(error) => { + warn!("Error deleting token: {}", error); + } + } + } + + pub fn delete_token_db(&mut self, token_name_id: String, token_params: &TokensQueryParams) { + let target = tokens.filter(token_name.eq(&token_name_id)); + + match diesel::delete(target).execute(&mut self.0) { + Ok(_) => { + info!("Token deleted from DB for user: {} in BK: {}", token_params.user_id, token_params.bk); + } + Err(error) => { + warn!("Error deleting token: {}", error); + } + } + } + + pub fn get_token_name( + &mut self, + token_params: &TokensQueryParams, + ) -> Result, Error> { + tokens + .filter(user_id.eq(token_params.user_id.clone())) + .filter(project_id.eq(token_params.project_id.clone())) + .filter(bk.eq(token_params.bk.clone())) + .order(id.desc()) + .select(token_name) + .first::(&mut self.0) + .optional() + } + + pub fn get_token_value( &mut self, + user: String, project: String, + bridgehead: String, + ) -> Result, Error> { + tokens + .filter(user_id.eq(user)) + .filter(project_id.eq(project)) + .filter(bk.eq(bridgehead)) + .order(id.desc()) + .select(token) + .first::(&mut self.0) + .optional() + } + + pub fn is_token_available(&mut self, params: &TokenParams) -> Result { + let result = tokens + .filter(user_id.eq(¶ms.user_id)) + .filter(project_id.eq(¶ms.project_id)) + .filter(bk.eq_any(¶ms.bridgehead_ids)) + .first::(&mut self.0) + .optional(); + + match result { + Ok(Some(_)) => Ok(true), + Ok(None) => Ok(false), + Err(e) => Err(e), + } + } + + pub async fn check_script_status( + &mut self, + params: TokenParams, + ) -> Result { + let token_available = self.is_token_available(¶ms); + + match token_available { + Ok(true) => { + info!("Token available for user: {}", params.user_id); + Ok("true".to_string()) + }, + Ok(false) => { + info!("No Token available for user: {}", params.user_id); + Ok("false".to_string()) + }, + Err(_) => Err(( + StatusCode::INTERNAL_SERVER_ERROR, + "Error checking token availability.".to_string(), + )), + } + } + + pub async fn check_token_status( + &mut self, + params: TokensQueryParams, ) -> Result, (StatusCode, String)> { - use crate::schema::tokens::dsl::*; + let mut token_status_json = json!({ + "project_id": params.project_id.clone(), + "bk": params.bk.clone(), + "user_id": params.user_id.clone(), + "token_created_at": "", + "project_status": OpalTokenStatus::NOTFOUND, + "token_status": OpalProjectStatus::NOTFOUND, + }); + + if let Ok(json_response) = check_project_status_request(ProjectQueryParams { + bk: params.bk.clone(), + project_id: params.project_id.clone(), + }) + .await + { + token_status_json["project_status"] = json_response.0["project_status"].clone(); + } else { + error!("Error retrieving project status"); + } - match tokens - .filter(project_id.eq(&project)) + let token_name_response = match self.get_token_name(¶ms) { + Ok(Some(name)) => name, + Ok(None) =>{ + info!( + "Received status response for token. User ID: {}, BK: {}, Response: {}", + params.user_id, + params.bk, + token_status_json["token_status"] + ); + return Ok(Json(token_status_json)) + }, + Err(e) => return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())), + }; + + let records = match tokens + .filter(user_id.eq(¶ms.user_id)) + .filter(bk.eq(¶ms.bk)) + .filter(project_id.eq(¶ms.project_id)) .select(TokenManager::as_select()) .load::(&mut self.0) { - Ok(records) => { - if !records.is_empty() { - info!("Project found with project_id: {:?}", &records); - let response = json!({ - "status": "success", - "data": records - }); - Ok(Json(response)) - } else { - info!("Project not found with project_id: {}", project); - let error_response = r#"{ - "status": "error", - "message": "Project not found with project_id" - }"#; - Err((StatusCode::NOT_FOUND, error_response.into())) - } + Ok(records) if !records.is_empty() => records, + Ok(_) => { + info!( + "Received status response for token. User ID: {}, BK: {}, Response: {}", + params.user_id, + params.bk, + token_status_json["token_status"] + ); + return Ok(Json(token_status_json)); } Err(err) => { error!("Error calling DB: {}", err); - Err(( - StatusCode::INTERNAL_SERVER_ERROR, - "Cannot connect to database".into(), - )) + return Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())); } + }; + + let record = &records[0]; + token_status_json["token_created_at"] = json!(record.token_created_at); + let token_value = json!(record.token).as_str().unwrap_or_default().to_string(); + + if let Ok(json_response) = check_token_status_request( + params.user_id.clone(), + params.bk.clone(), + params.project_id.clone(), + token_name_response.clone(), + token_value.clone(), + ) + .await + { + token_status_json["token_status"] = json_response.0["token_status"].clone(); + + let new_token_status = TokenStatus { + project_id: ¶ms.project_id.clone(), + bk: ¶ms.bk.clone(), + token_status: OpalTokenStatus::CREATED.as_str(), + user_id: ¶ms.user_id.clone(), + }; + self.update_token_status_db(new_token_status); + } else { + error!("Error retrieving token status"); } - } - pub async fn generate_user_script(&mut self, query: ScriptParams) -> Result { - use crate::schema::tokens::dsl::*; + info!( + "Received status response for token. User ID: {}, BK: {}, Response: {}", + params.user_id, + params.bk, + token_status_json["token_status"] + ); - let records = tokens - .filter(project_id.eq(&query.project)) // Match project_id from the query parameters - .filter(user_id.eq(&query.user)) // Match user_id from the query parameters - .select(TokenManager::as_select()) - .load::(&mut self.0); + Ok(Json(token_status_json)) + } - match records { - Ok(records) => { + pub async fn generate_user_script(&mut self, query: TokenParams) -> Result { + let tables_per_bridgehead_result = fetch_project_tables_names_request(query.clone()).await; + + match tables_per_bridgehead_result { + Ok(tables_per_bridgehead) => { let mut script_lines = Vec::new(); - if !records.is_empty() { - for record in records { - info!("Records Extracted: {:?}", record); - script_lines.push(format!("builder$append(server='DockerOpal', url='https://{}/opal/', token='{}', table='{}', driver='OpalDriver')", - record.bk , record.token, record.project_id - )); + let all_tables = tables_per_bridgehead + .values() + .flat_map(|tables| tables.iter()) + .collect::>(); + + for bridgehead in &query.bridgehead_ids { + let records_result = tokens + .filter(project_id.eq(&query.project_id)) + .filter(user_id.eq(&query.user_id)) + .filter(bk.eq(bridgehead)) + .order(id.desc()) + .select(TokenManager::as_select()) + .first::(&mut self.0); + + match records_result { + Ok(record) => { + let token_decrypt = decrypt_data( + record.token.clone(), + &record.token_name.clone().as_bytes()[..16], + ); + if let Some(tables) = tables_per_bridgehead.get(bridgehead) { + let tables_set: HashSet<_> = tables.iter().collect(); + let missing_tables: HashSet<_> = + all_tables.difference(&tables_set).collect(); + if !missing_tables.is_empty() { + info!( + "Bridgehead {} is missing tables: {:?}", + bridgehead, missing_tables + ); + script_lines.push(format!( + "\n # Tables not available for bridgehead '{}': {:?}", + bridgehead, missing_tables + )); + } + + for table in tables { + let site_name = + record.bk.split('.').nth(1).expect("Valid app id"); + script_lines.push(format!( + "builder$append(server='{}', url='https://{}/opal/', token='{}', table='{}', driver='OpalDriver')", + site_name, record.bk, token_decrypt, table.clone() + )); + } + script_lines.push("".to_string()); + } + } + Err(_) => { + info!("Token not available for Bridgehead {}", bridgehead); + script_lines.push(format!( + "\n # Token not available for bridgehead '{}'", + bridgehead + )); + } } + } + if !script_lines.is_empty() { let script = generate_r_script(script_lines); - info!("Script Generated: {:?}", script); Ok(script) } else { - info!("No records were found"); Ok("No records found for the given project and user.".into()) } } - Err(err) => { - error!("Error loading records: {}", err); - Err(format!("Error loading records: {}", err)) + Err(e) => { + error!("Error in fetch_project_tables_names_request: {:?}", e); + Err("Error obtaining table names.".into()) } } } } - -fn generate_r_script(script_lines: Vec) -> String { - let mut builder_script = String::from( -r#"library(DSI) -library(DSOpal) -library(dsBaseClient) -set_config(use_proxy(url="http://beam-connect", port=8062)) -set_config( config( ssl_verifyhost = 0L, ssl_verifypeer = 0L ) ) - -builder <- DSI::newDSLoginBuilder(.silent = FALSE) -"#, - ); - - // Append each line to the script. - for line in script_lines { - builder_script.push_str(&line); - builder_script.push('\n'); - } - - // Finish the script with the login and assignment commands. - builder_script.push_str( - "logindata <- builder$build() -connections <- DSI::datashield.login(logins = logindata, assign = TRUE, symbol = 'D')\n", - ); - - builder_script -} diff --git a/src/enums.rs b/src/enums.rs new file mode 100644 index 0000000..ff0fb35 --- /dev/null +++ b/src/enums.rs @@ -0,0 +1,90 @@ +use serde::{Deserialize, Serialize}; +use std::fmt; + +#[derive(Debug, Deserialize)] +#[serde(untagged)] +pub enum OpalResponse { + Err { + status_code: i32, + error_message: String, + }, + Ok { + response: T, + }, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Deserialize, Serialize)] +pub enum OpalRequestType { + #[serde(rename = "CREATE")] + CREATE, + #[serde(rename = "DELETE")] + DELETE, + #[serde(rename = "UPDATE")] + UPDATE, + #[serde(rename = "STATUS")] + STATUS, + #[serde(rename = "SCRIPT")] + SCRIPT, +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Deserialize, Serialize)] +pub enum OpalProjectStatus { + #[serde(rename = "CREATED")] + CREATED, + #[serde(rename = "WITH_DATA")] + WITHDATA, + #[serde(rename = "NOT_FOUND")] + NOTFOUND, + #[serde(rename = "ERROR")] + ERROR +} + +#[allow(clippy::upper_case_acronyms)] +#[derive(Debug, Deserialize, Serialize)] +pub enum OpalTokenStatus { + #[serde(rename = "CREATED")] + CREATED, + #[serde(rename = "EXPIRED")] + EXPIRED, + #[serde(rename = "NOT_FOUND")] + NOTFOUND, + #[serde(rename = "ERROR")] + ERROR +} + +impl OpalProjectStatus { + pub fn as_str(&self) -> &'static str { + match self { + OpalProjectStatus::CREATED => "CREATED", + OpalProjectStatus::WITHDATA => "WITH_DATA", + OpalProjectStatus::NOTFOUND => "NOT_FOUND", + OpalProjectStatus::ERROR => "ERROR" + } + } +} + +impl OpalTokenStatus { + pub fn as_str(&self) -> &'static str { + match self { + OpalTokenStatus::CREATED => "CREATED", + OpalTokenStatus::EXPIRED => "EXPIRED", + OpalTokenStatus::NOTFOUND => "NOT_FOUND", + OpalTokenStatus::ERROR => "ERROR" + } + } +} + +impl fmt::Display for OpalRequestType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let text = match self { + OpalRequestType::CREATE => "CREATE", + OpalRequestType::DELETE => "DELETE", + OpalRequestType::UPDATE => "UPDATE", + OpalRequestType::STATUS => "STATUS", + OpalRequestType::SCRIPT => "SCRIPT", + }; + write!(f, "{}", text) + } +} diff --git a/src/handlers.rs b/src/handlers.rs index 480aa2d..eb8c72a 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -1,61 +1,418 @@ +use std::collections::{HashMap, HashSet}; use std::io; use crate::config::BEAM_CLIENT; use crate::config::CONFIG; use crate::db::Db; -use crate::models::{NewToken, OpalRequest, TokenParams}; +use crate::enums::{OpalProjectStatus, OpalRequestType, OpalResponse, OpalTokenStatus}; +use crate::models::{NewToken, OpalRequest, ProjectQueryParams, TokenParams, TokensQueryParams}; +use crate::utils::{decrypt_data, encrypt_data}; use anyhow::Result; use async_sse::Event; -use axum::http::HeaderValue; -use beam_lib::{AppId, TaskRequest, MsgId, TaskResult}; -use futures_util::StreamExt; -use futures_util::stream::TryStreamExt; +use axum::http::StatusCode; +use axum::{http::HeaderValue, Json}; +use base64::{engine::general_purpose::STANDARD, Engine}; +use beam_lib::{AppId, MsgId, TaskRequest, TaskResult}; use chrono::Local; +use futures_util::stream::TryStreamExt; +use futures_util::StreamExt; use reqwest::{header, Method}; +use serde_json::json; use tracing::warn; -use tracing::info; +use tracing::{debug, info}; +use uuid::Uuid; -pub async fn send_token_registration_request(db: Db, token_params: TokenParams) -> Result<()> { - let bridgeheads = &token_params.bridgehead_ids; - let broker = CONFIG.beam_id.as_ref().splitn(3, '.').nth(2).expect("Valid app id"); - let bks: Vec<_> = bridgeheads.iter().map(|bk| AppId::new_unchecked(format!("{}.{bk}.{broker}", CONFIG.opal_beam_name))).collect(); +pub async fn send_token_registration_request( + mut db: Db, + token_params: TokenParams, +) -> Result<(), anyhow::Error> { + if db.is_token_available(&token_params)? { + return Ok(()); + } - let request = OpalRequest { - name: token_params.email.clone(), - project: token_params.project_id.clone(), + let token_name = Uuid::new_v4().to_string(); + let task = create_and_send_task_request( + OpalRequestType::CREATE, + Some(token_name.clone()), + Some(token_params.project_id.clone().to_string()), + Some(token_params.bridgehead_ids.clone()), + None, + ) + .await?; + + debug!("Created token task {task:#?}"); + tokio::task::spawn(save_tokens_from_beam(db, task, token_params, token_name)); + Ok(()) +} + +pub async fn send_token_from_db(token_params: TokenParams, token_name: String, token: String) { + let task = create_and_send_task_request( + OpalRequestType::CREATE, + Some(token_name.clone()), + Some(token_params.project_id.clone().to_string()), + Some(token_params.bridgehead_ids.clone()), + Some(token.clone()), + ) + .await; + debug!("Create token in Opal from DB task: {:?}", task); +} + +pub async fn remove_project_and_tokens_request( + mut db: Db, + token_params: &ProjectQueryParams, +) -> Result, anyhow::Error> { + let task = create_and_send_task_request( + OpalRequestType::DELETE, + None, + Some(token_params.project_id.clone()), + Some(vec![token_params.bk.clone()]), + None, + ) + .await?; + + debug!("Remove Project and Token request {task:#?}"); + + match remove_project_and_tokens_from_beam(task).await { + Ok(response) => { + db.delete_project_db(&token_params); + Ok(response) + } + Err(e) => Err(e), + } +} + +pub async fn remove_tokens_request( + mut db: Db, + token_params: &TokensQueryParams, +) -> Result, anyhow::Error> { + let token_name = match db.get_token_name(&token_params) { + Ok(Some(name)) => name, + Ok(None) => return Err(anyhow::Error::msg("Token not found")), + Err(e) => { + return Err(e.into()); + } }; - let task = TaskRequest { - id: MsgId::new(), - from: CONFIG.beam_id.clone(), - to: bks, - body: request, - ttl: "60s".into(), - failure_strategy: beam_lib::FailureStrategy::Discard, - metadata: serde_json::Value::Null, + + let task = create_and_send_task_request( + OpalRequestType::DELETE, + Some(token_name.clone()), + None, + Some(vec![token_params.bk.clone()]), + None, + ) + .await?; + + debug!("Remove Tokens request {task:#?}"); + + match remove_tokens_from_beam(task).await { + Ok(response) => { + db.delete_token_db(token_name, &token_params); + Ok(response) + } + Err(e) => Err(e), + } +} + +pub async fn refresh_token_request( + mut db: Db, + token_params: TokenParams, +) -> Result<(), anyhow::Error> { + let token_query_params: TokensQueryParams = TokensQueryParams { + user_id: token_params.user_id.clone(), + bk: token_params.bridgehead_ids[0].clone(), + project_id: token_params.project_id.clone(), }; - // TODO: Handle error - BEAM_CLIENT.post_task(&task).await?; - info!("Created token task {task:#?}"); - tokio::task::spawn(save_tokens_from_beam(db, task, token_params)); + let token_name = match db.get_token_name(&token_query_params) { + Ok(Some(name)) => name, + Ok(None) => return Err(anyhow::Error::msg("Token name not found")), + Err(e) => { + return Err(e.into()); + } + }; + + let token_value = match db.get_token_value( + token_params.user_id.clone(), + token_params.project_id.clone(), + token_params.bridgehead_ids[0].clone(), + ) { + Ok(Some(value)) => decrypt_data(value, &token_name.clone().as_bytes()[..16]), + Ok(None) => return Err(anyhow::Error::msg("Token value not found")), + Err(e) => { + return Err(e.into()); + } + }; + + let task = create_and_send_task_request( + OpalRequestType::UPDATE, + Some(token_name.clone()), + Some(token_params.project_id.clone().to_string()), + Some(token_params.bridgehead_ids.clone()), + Some(token_value.clone()), + ) + .await?; + + tokio::task::spawn(update_tokens_from_beam( + db, + task, + token_params, + token_name.clone(), + )); Ok(()) } -#[derive(Debug, serde::Deserialize)] -#[serde(untagged)] -enum OpalResponse { - Err { - error: String, - }, - Ok { - token: String, +pub async fn fetch_project_tables_names_request( + token_params: TokenParams, +) -> Result>, anyhow::Error> { + let task = create_and_send_task_request( + OpalRequestType::SCRIPT, + Some(token_params.user_id.clone().to_string()), + Some(token_params.project_id.clone().to_string()), + Some(token_params.bridgehead_ids.clone()), + None, + ) + .await?; + + debug!("Fetch Project Tables Status {task:#?}"); + + fetch_project_tables_from_beam(task).await +} + +pub async fn check_project_status_request( + query_params: ProjectQueryParams, +) -> Result, (StatusCode, String)> { + let mut response_json = json!({ + "project_id": query_params.project_id.clone(), + "bk": query_params.bk.clone(), + "project_status": OpalTokenStatus::NOTFOUND, + }); + + let task = match create_and_send_task_request( + OpalRequestType::STATUS, + None, + Some(query_params.project_id.clone().to_string()), + Some(vec![query_params.bk.clone().to_string()]), + None, + ) + .await + { + Ok(result) => result, + Err(e) => { + warn!( + "Received error response for project status request: Project ID: {}, BK: {}. Reason: BK value not found or unavailable. Error details: {:?}", + query_params.project_id, + query_params.bk, + e + ); + return Ok(Json(response_json)) + } + }; + + debug!("Check Project Status {task:#?}"); + + let project_status_result = match status_from_beam(task).await { + Ok(response) => Ok(response), + Err(e) => Err(e), + }; + + match project_status_result { + Ok(OpalResponse::Ok { response }) => { + info!( + "Received status response for project. Project ID: {}, BK: {}, Response: {}", + query_params.project_id, + query_params.bk, + json!(response).to_string() + ); + response_json["project_status"] = json!(response); + } + Ok(OpalResponse::Err { + status_code, + error_message, + }) => { + let status = StatusCode::from_u16(status_code as u16) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + debug!("Received status response for project. Project ID: {}, BK: {}, Status: {}, Response: {}", + query_params.project_id, + query_params.bk, + status, + error_message); + } + Err(e) => { + info!("Bridgehead: {}, Error retrieving project status: Failed to deserialize message", query_params.bk); + debug!("Error retrieving project status: {:?}", e); + response_json["project_status"] = json!(OpalProjectStatus::ERROR); + } + }; + + Ok(Json(response_json)) +} + +pub async fn check_token_status_request( + user_id: String, + bridgehead: String, + project: String, + token_name: String, + token: String, +) -> Result, (StatusCode, String)> { + let mut response_json = json!({ + "user_id": user_id.clone(), + "bk": bridgehead.clone(), + "token_status": OpalTokenStatus::NOTFOUND, + }); + + let task = match create_and_send_task_request( + OpalRequestType::STATUS, + Some(token_name.clone().to_string()), + None, + Some(vec![bridgehead.clone().to_string()]), + None, + ) + .await + { + Ok(result) => result, + Err(e) => { + return Err(( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error creating task: {}", e), + )) + } + }; + + debug!("Check Token Status {task:#?}"); + + let token_status_result = match status_from_beam(task).await { + Ok(response) => { + debug!("Token Status response {response:#?}"); + Ok(response) + } + Err(e) => Err(e), + }; + + match token_status_result { + Ok(OpalResponse::Ok { response }) => { + response_json["token_status"] = json!(response); + + if response == OpalTokenStatus::CREATED.as_str() { + response_json["token_status"] = json!(response); + } else { + let params = TokenParams { + user_id: user_id.clone(), + project_id: project.clone(), + bridgehead_ids: vec![bridgehead.clone()], + }; + + send_token_from_db(params, token_name, token).await; + response_json["token_status"] = json!(OpalTokenStatus::CREATED.as_str()); + } + } + Ok(OpalResponse::Err { + status_code, + error_message, + }) => { + let status = StatusCode::from_u16(status_code as u16) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + eprintln!("Token status error: {}, {}", status, error_message); + } + Err(e) => { + eprintln!("Error retrieving token status: {:?}", e); + return Err((StatusCode::INTERNAL_SERVER_ERROR, e.to_string())); + } + }; + + Ok(Json(response_json)) +} + +async fn save_tokens_from_beam( + mut db: Db, + task: TaskRequest, + token_params: TokenParams, + token_name: String, +) -> Result<()> { + let today = Local::now(); + let formatted_date = today.format("%d-%m-%Y %H:%M:%S").to_string(); + + let res = BEAM_CLIENT + .raw_beam_request( + Method::GET, + &format!("/v1/tasks/{}/results?wait_count={}", task.id, task.to.len()), + ) + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .send() + .await + .expect("Beam was reachable in the post request before this"); + + let mut stream = async_sse::decode( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), + ); + + let mut last_error: Option = None; + + while let Some(Ok(Event::Message(msg))) = stream.next().await { + let result: TaskResult> = match serde_json::from_slice(msg.data()) { + Ok(v) => v, + Err(e) => { + let error_msg = format!("Failed to deserialize message {msg:?} into a result: {e}"); + debug!("{error_msg}"); + last_error = Some(error_msg); + continue; + } + }; + + match result.body { + OpalResponse::Err { + status_code, + error_message, + } => { + warn!("{} failed to create a token with status code: {status_code}, error: {error_message}", result.from); + last_error = Some(format!("Error: {error_message}")); + } + OpalResponse::Ok { response } => { + let encryp_token = encrypt_data( + response.clone().as_bytes(), + &token_name.clone().as_bytes()[..16], + ); + let token_encoded = STANDARD.encode(encryp_token); + let site_name = result.from.as_ref(); + + let new_token = NewToken { + token_name: &token_name, + token: &token_encoded, + project_id: &token_params.project_id, + bk: site_name, + token_status: OpalTokenStatus::CREATED.as_str(), + project_status: OpalProjectStatus::CREATED.as_str(), + user_id: &token_params.user_id, + token_created_at: &formatted_date, + }; + db.save_token_db(new_token); + } + } + } + + if let Some(e) = last_error { + warn!("Error processing task {}: {}", task.id, e); } + Ok(()) } -async fn save_tokens_from_beam(mut db: Db, task: TaskRequest, token_params: TokenParams) { +async fn update_tokens_from_beam( + mut db: Db, + task: TaskRequest, + token_params: TokenParams, + token_name: String, +) -> Result<()> { let today = Local::now(); - let formatted_date = today.format("%d-%m-%Y").to_string(); + let formatted_date = today.format("%d-%m-%Y %H:%M:%S").to_string(); + let res = BEAM_CLIENT - .raw_beam_request(Method::GET, &format!("/v1/tasks/{}/results?wait_count={}", task.id, task.to.len())) + .raw_beam_request( + Method::GET, + &format!("/v1/tasks/{}/results?wait_count={}", task.id, task.to.len()), + ) .header( header::ACCEPT, HeaderValue::from_static("text/event-stream"), @@ -63,40 +420,338 @@ async fn save_tokens_from_beam(mut db: Db, task: TaskRequest, token .send() .await .expect("Beam was reachable in the post request before this"); - let mut stream = async_sse::decode(res - .bytes_stream() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) - .into_async_read() + + let mut stream = async_sse::decode( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), ); - while let Some(Ok(Event::Message(msg))) = stream.next().await { - if msg.name() == "error" { - warn!("{}", String::from_utf8_lossy(msg.data())); - break; + + let mut last_error: Option = None; + + while let Some(Ok(Event::Message(msg))) = stream.next().await { + let result: TaskResult> = match serde_json::from_slice(msg.data()) { + Ok(v) => v, + Err(e) => { + let error_msg = format!("Failed to deserialize message {msg:?} into a result: {e}"); + debug!("{error_msg}"); + last_error = Some(error_msg); + continue; + } + }; + + match result.body { + OpalResponse::Err { + status_code, + error_message, + } => { + warn!("{} failed to create a token with status code: {status_code}, error: {error_message}", result.from); + last_error = Some(format!("Error: {error_message}")); + } + OpalResponse::Ok { response } => { + let encryp_token = encrypt_data( + response.clone().as_bytes(), + &token_name.clone().as_bytes()[..16], + ); + let token_encoded = STANDARD.encode(encryp_token); + let site_name = result.from.as_ref(); + + let new_token = NewToken { + token_name: &token_name, + token: &token_encoded, + project_id: &token_params.project_id, + bk: site_name, + token_status: OpalTokenStatus::CREATED.as_str(), + project_status: OpalProjectStatus::CREATED.as_str(), + user_id: &token_params.user_id, + token_created_at: &formatted_date, + }; + db.update_token_db(new_token); + } } - let result: TaskResult = match serde_json::from_slice(msg.data()) { + } + + if let Some(e) = last_error { + warn!("Error processing task {}: {}", task.id, e); + } + Ok(()) +} + +async fn status_from_beam( + task: TaskRequest, +) -> Result, anyhow::Error> { + let res = BEAM_CLIENT + .raw_beam_request( + Method::GET, + &format!("/v1/tasks/{}/results?wait_count={}", task.id, task.to.len()), + ) + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .send() + .await + .expect("Beam was reachable in the post request before this"); + + let mut stream = async_sse::decode( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), + ); + + let mut last_error: Option = None; + + while let Some(Ok(Event::Message(msg))) = stream.next().await { + let result: TaskResult> = match serde_json::from_slice(msg.data()) { Ok(v) => v, Err(e) => { - warn!("Failed to deserialize message {msg:?} into a result: {e}"); + let error_msg = format!("Failed to deserialize message {msg:?} into a result: {e}"); + debug!("{error_msg}"); + last_error = Some(error_msg); continue; - }, + } }; - let site_name = result.from.as_ref().split('.').nth(1).expect("Valid app id"); - let token = &match result.body { - OpalResponse::Err { error } => { - warn!("{} failed to create a token: {error}", result.from); + + match result.body { + OpalResponse::Err { + status_code, + error_message, + } => { + warn!( + "{} failed to fecth project status code: {status_code}, error: {error_message}", + result.from + ); + return Ok(OpalResponse::Err { + status_code, + error_message, + }); + } + OpalResponse::Ok { response } => { + return Ok(OpalResponse::Ok { response }); + } + } + } + + match last_error { + Some(e) => Err(anyhow::Error::msg(e)), + None => Err(anyhow::Error::msg("No messages received or processed")), + } +} + +async fn fetch_project_tables_from_beam( + task: TaskRequest, +) -> Result>, anyhow::Error> { + let res = BEAM_CLIENT + .raw_beam_request( + Method::GET, + &format!( + "/v1/tasks/{}/results?wait_count={}&wait_time=30s", + task.id, + task.to.len() + ), + ) + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .send() + .await + .expect("Beam was reachable in the post request before this"); + let mut stream = async_sse::decode( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), + ); + + let mut tables_per_bridgehead: HashMap> = HashMap::new(); + while let Some(Ok(Event::Message(msg))) = stream.next().await { + let result: TaskResult>> = match serde_json::from_slice(msg.data()) + { + Ok(v) => v, + Err(e) => { + let error_msg = format!("Failed to deserialize message {msg:?} into a result: {e}"); + debug!("{error_msg}"); continue; - }, - OpalResponse::Ok { token } => token, + } }; - let new_token = NewToken { - token, - project_id: &token_params.project_id, - bk: &site_name, - status: "CREATED", - user_id: &token_params.email, - created_at: &formatted_date, + + match result.body { + OpalResponse::Err { + status_code, + error_message, + } => { + warn!( + "status: {} from bk {} failed to fetch tables: {}", + status_code, result.from, error_message + ); + continue; + } + OpalResponse::Ok { response } => { + let bridgehead_tables = tables_per_bridgehead + .entry(result.from.as_ref().to_string()) + .or_default(); + for table in response { + bridgehead_tables.insert(table.clone()); + } + } }; + } - db.save_token_db(new_token); + Ok(tables_per_bridgehead) +} + +async fn remove_project_and_tokens_from_beam( + task: TaskRequest, +) -> Result, anyhow::Error> { + let res = BEAM_CLIENT + .raw_beam_request( + Method::GET, + &format!("/v1/tasks/{}/results?wait_count={}", task.id, task.to.len()), + ) + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .send() + .await + .expect("Beam was reachable in the post request before this"); + + let mut stream = async_sse::decode( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), + ); + + let mut last_error: Option = None; + + while let Some(Ok(Event::Message(msg))) = stream.next().await { + let result: TaskResult> = match serde_json::from_slice(msg.data()) { + Ok(v) => v, + Err(e) => { + let error_msg = format!("Failed to deserialize message {msg:?} into a result: {e}"); + debug!("{error_msg}"); + last_error = Some(error_msg); + continue; + } + }; + + match result.body { + OpalResponse::Err { + status_code, + error_message, + } => { + warn!( + "{} failed to fecth project status code: {status_code}, error: {error_message}", + result.from + ); + return Ok(OpalResponse::Err { + status_code, + error_message, + }); + } + OpalResponse::Ok { response } => { + return Ok(OpalResponse::Ok { response }); + } + } + } + + match last_error { + Some(e) => Err(anyhow::Error::msg(e)), + None => Err(anyhow::Error::msg("No messages received or processed")), + } +} + +async fn remove_tokens_from_beam( + task: TaskRequest, +) -> Result, anyhow::Error> { + let res = BEAM_CLIENT + .raw_beam_request( + Method::GET, + &format!("/v1/tasks/{}/results?wait_count={}", task.id, task.to.len()), + ) + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .send() + .await + .expect("Beam was reachable in the post request before this"); + + let mut stream = async_sse::decode( + res.bytes_stream() + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + .into_async_read(), + ); + + let mut last_error: Option = None; + + while let Some(Ok(Event::Message(msg))) = stream.next().await { + let result: TaskResult> = match serde_json::from_slice(msg.data()) { + Ok(v) => v, + Err(e) => { + let error_msg = format!("Failed to deserialize message {msg:?} into a result: {e}"); + debug!("{error_msg}"); + last_error = Some(error_msg); + continue; + } + }; + + match result.body { + OpalResponse::Err { + status_code, + error_message, + } => { + warn!( + "{} failed to fecth project status code: {status_code}, error: {error_message}", + result.from + ); + return Ok(OpalResponse::Err { + status_code, + error_message, + }); + } + OpalResponse::Ok { response } => { + return Ok(OpalResponse::Ok { response }); + } + } + } + + match last_error { + Some(e) => Err(anyhow::Error::msg(e)), + None => Err(anyhow::Error::msg("No messages received or processed")), + } +} + +async fn create_and_send_task_request( + request_type: OpalRequestType, + name: Option, + project: Option, + bridgeheads: Option>, + token: Option, +) -> Result, anyhow::Error> { + let bks: Vec<_> = bridgeheads + .unwrap_or_default() + .iter() + .map(|bridgehead_id| AppId::new_unchecked(bridgehead_id.to_string())) + .collect(); + + let request = OpalRequest { + request_type: request_type.to_string(), + name, + project, + token, }; + + let task = TaskRequest { + id: MsgId::new(), + from: CONFIG.beam_id.clone(), + to: bks, + body: request, + ttl: "60s".into(), + failure_strategy: beam_lib::FailureStrategy::Discard, + metadata: serde_json::Value::Null, + }; + + BEAM_CLIENT.post_task(&task).await?; + Ok(task) } diff --git a/src/main.rs b/src/main.rs index 19c82e1..3aff92f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,29 +1,38 @@ mod config; mod db; +mod enums; mod handlers; mod models; mod routes; mod schema; +mod utils; use crate::config::CONFIG; use axum::Router; use routes::configure_routes; -use tracing::{info, Level}; +use tokio::net::TcpListener; +use tracing::info; use tracing_subscriber::{fmt::SubscriberBuilder, EnvFilter}; #[tokio::main] async fn main() -> anyhow::Result<()> { - let env_filter = EnvFilter::from_default_env().add_directive(Level::INFO.into()); + let env_filter = EnvFilter::try_new(&CONFIG.rust_log) + .or_else(|_| EnvFilter::try_new("info")) + .unwrap(); + let subscriber = SubscriberBuilder::default() .with_env_filter(env_filter) .finish(); + tracing::subscriber::set_global_default(subscriber)?; info!("Starting server token ON!"); let app = Router::new().nest("/api", configure_routes(db::setup_db()?)); - axum::Server::bind(&CONFIG.addr) - .serve(app.into_make_service()) - .await?; - Ok(()) + axum::serve(TcpListener::bind(&CONFIG.addr).await?, app.into_make_service()) + .with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.unwrap(); + }) + .await + .map_err(Into::into) } diff --git a/src/models.rs b/src/models.rs index 007c4b4..d4742e1 100644 --- a/src/models.rs +++ b/src/models.rs @@ -2,23 +2,25 @@ use crate::schema::tokens; use diesel::prelude::*; use serde::{Deserialize, Serialize}; -#[derive(Debug, Deserialize)] +#[derive(Debug, Deserialize, Clone)] pub struct TokenParams { - pub email: String, + pub user_id: String, pub project_id: String, pub bridgehead_ids: Vec, } -#[derive(Deserialize)] +#[derive(Deserialize, Clone)] pub struct ScriptParams { - pub project: String, - pub user: String, + pub project_id: String, + pub user_id: String, } #[derive(Serialize, Debug)] pub struct OpalRequest { - pub name: String, - pub project: String, + pub request_type: String, + pub name: Option, + pub project: Option, + pub token: Option, } #[derive(Debug, Serialize, Deserialize, Queryable, Selectable)] @@ -26,21 +28,47 @@ pub struct OpalRequest { #[diesel(check_for_backend(diesel::sqlite::Sqlite))] pub struct TokenManager { pub id: i32, + pub token_name: String, pub token: String, pub project_id: String, + pub project_status: String, pub bk: String, - pub status: String, + pub token_status: String, pub user_id: String, - pub created_at: String, + pub token_created_at: String, } #[derive(Insertable)] #[diesel(table_name = tokens)] pub struct NewToken<'a> { + pub token_name: &'a str, pub token: &'a str, + pub project_id: &'a str, + pub project_status: &'a str, + pub bk: &'a str, + pub token_status: &'a str, + pub user_id: &'a str, + pub token_created_at: &'a str, +} + +#[derive(Insertable)] +#[diesel(table_name = tokens)] +pub struct TokenStatus<'a> { pub project_id: &'a str, pub bk: &'a str, - pub status: &'a str, + pub token_status: &'a str, pub user_id: &'a str, - pub created_at: &'a str, +} + +#[derive(Deserialize, Debug)] +pub struct TokensQueryParams { + pub user_id: String, + pub bk: String, + pub project_id: String, +} + +#[derive(Deserialize, Debug)] +pub struct ProjectQueryParams { + pub bk: String, + pub project_id: String, } diff --git a/src/routes.rs b/src/routes.rs index 6807da5..24f908c 100644 --- a/src/routes.rs +++ b/src/routes.rs @@ -1,64 +1,131 @@ use crate::db::Db; -use crate::handlers::send_token_registration_request; -use crate::models::{ScriptParams, TokenParams}; +use crate::enums::OpalResponse; +use crate::handlers::{ + check_project_status_request, refresh_token_request, remove_project_and_tokens_request, + remove_tokens_request, send_token_registration_request, +}; +use crate::models::{ProjectQueryParams, TokenParams, TokensQueryParams}; use axum::{ - extract::Path, + extract::Query, http::StatusCode, response::IntoResponse, - routing::{get, post}, + routing::{delete, get, post, put}, Json, Router, }; use serde_json::json; -use tracing::{warn, error}; +use tracing::debug; +async fn create_token(db: Db, token_params: Json) -> impl IntoResponse { + if let Err(e) = send_token_registration_request(db, token_params.0).await { + debug!("Unhandled error: {e:?}"); + StatusCode::INTERNAL_SERVER_ERROR + } else { + StatusCode::OK + } +} -async fn create_token( - db: Db, - token_params: Json, -) -> StatusCode { - match send_token_registration_request(db, token_params.0).await { - Ok(_) => { - StatusCode::OK - } - Err(e) => { - error!("Error creating token task: {e:?}"); - StatusCode::INTERNAL_SERVER_ERROR - } +async fn check_project_status(status_query: Query) -> impl IntoResponse { + match check_project_status_request(status_query.0).await { + Ok(json) => (StatusCode::OK, json).into_response(), + Err((status, message)) => (status, Json(json!({"message": message}))).into_response(), } } -async fn check_status(mut db: Db, Path(project_id): Path) -> impl IntoResponse { - if project_id.is_empty() { - let error_response = json!({ - "status": "error", - "message": "Project ID is required" - }); - return (StatusCode::BAD_REQUEST, Json(error_response)).into_response(); +async fn check_token_status( + mut db: Db, + status_query: Query, +) -> impl IntoResponse { + match db.check_token_status(status_query.0).await { + Ok(json) => (StatusCode::OK, json).into_response(), + Err((status, message)) => (status, Json(json!({"message": message}))).into_response(), } +} - match db.check_project_status(project_id).await { +async fn check_script_status(mut db: Db, status_params: Json) -> impl IntoResponse { + match db.check_script_status(status_params.0).await { Ok(json) => (StatusCode::OK, json).into_response(), Err((status, message)) => (status, Json(json!({"message": message}))).into_response(), } } -async fn generate_script( - mut db: Db, - script_params: Json, -) -> impl IntoResponse { +async fn generate_script(mut db: Db, script_params: Json) -> impl IntoResponse { match db.generate_user_script(script_params.0).await { Ok(script) => (StatusCode::OK, script).into_response(), Err(e) => { - warn!("Error generating script: {e}"); + debug!("Error generating script: {e}"); StatusCode::INTERNAL_SERVER_ERROR.into_response() - }, + } + } +} + +async fn refresh_token(db: Db, token_params: Json) -> StatusCode { + if let Err(e) = refresh_token_request(db, token_params.0).await { + debug!("Unhandled error: {e:?}"); + StatusCode::INTERNAL_SERVER_ERROR + } else { + StatusCode::OK + } +} + +async fn remove_project_and_token(db: Db, query: Query) -> impl IntoResponse { + match remove_project_and_tokens_request(db, &query.0).await { + Ok(OpalResponse::Ok { .. }) => StatusCode::OK.into_response(), + Ok(OpalResponse::Err { + status_code, + error_message, + }) => { + debug!( + ?query, + ?error_message, + ?status_code, + "Got error while project" + ); + let status = StatusCode::from_u16(status_code as u16) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + (status, Json(json!({ "error": error_message }))).into_response() + } + Err(e) => { + debug!("Unhandled error: {e:?}"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } + } +} + +async fn remove_tokens(db: Db, query: Query) -> impl IntoResponse { + match remove_tokens_request(db, &query.0).await { + Ok(OpalResponse::Ok { .. }) => StatusCode::OK.into_response(), + Ok(OpalResponse::Err { + status_code, + error_message, + }) => { + debug!( + ?query, + ?error_message, + ?status_code, + "Got error while removing tokens" + ); + let status = StatusCode::from_u16(status_code as u16) + .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR); + (status, Json(json!({ "error": error_message }))).into_response() + } + Err(e) => { + debug!("Unhandled error: {e:?}"); + StatusCode::INTERNAL_SERVER_ERROR.into_response() + } } } -pub fn configure_routes(pool: diesel::r2d2::Pool>) -> Router { +pub fn configure_routes( + pool: diesel::r2d2::Pool>, +) -> Router { Router::new() - .route("/tokens", post(create_token)) - .route("/projects/:project_id/status", get(check_status)) - .route("/scripts", get(generate_script)) + .route("/token", post(create_token)) + .route("/token", delete(remove_tokens)) + .route("/token-status", get(check_token_status)) + .route("/project-status", get(check_project_status)) + .route("/script", post(generate_script)) + .route("/refreshToken", put(refresh_token)) + .route("/project", delete(remove_project_and_token)) + .route("/authentication-status", post(check_script_status)) .with_state(pool) } diff --git a/src/schema.rs b/src/schema.rs index 3bcabdb..71fde4a 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -3,11 +3,13 @@ diesel::table! { tokens (id) { id -> Integer, + token_name -> Text, token -> Text, + token_status -> Text, project_id -> Text, + project_status -> Text, bk -> Text, - status -> Text, user_id -> Text, - created_at -> Text, + token_created_at -> Text, } } diff --git a/src/utils.rs b/src/utils.rs new file mode 100644 index 0000000..0007204 --- /dev/null +++ b/src/utils.rs @@ -0,0 +1,57 @@ +use crate::config::CONFIG; +use aes::Aes256; +use base64::{engine::general_purpose::STANDARD, Engine}; +use cipher::{KeyIvInit, StreamCipher}; +use ctr::Ctr128BE; + +fn adjust_key(key: &str) -> [u8; 32] { + let bytes = key.as_bytes(); + let mut array = [0u8; 32]; + let bytes_to_copy = bytes.len().min(32); + array[..bytes_to_copy].copy_from_slice(&bytes[..bytes_to_copy]); + + array +} + +pub fn encrypt_data(data: &[u8], nonce: &[u8]) -> Vec { + let key: [u8; 32] = adjust_key(&CONFIG.token_encrypt_key); + + let mut cipher = Ctr128BE::::new_from_slices(&key, nonce).unwrap(); + let mut encrypted = data.to_vec(); + cipher.apply_keystream(&mut encrypted); + + encrypted +} + +pub fn decrypt_data(data: String, nonce: &[u8]) -> String { + let toke_decode = STANDARD.decode(data).unwrap(); + let decrypted_token = encrypt_data(&toke_decode, nonce); + String::from_utf8(decrypted_token).unwrap() +} + +pub fn generate_r_script(script_lines: Vec) -> String { + let mut builder_script = String::from( + r#"library(DSI) +library(DSOpal) +library(dsBaseClient) +set_config(use_proxy(url="http://beam-connect", port=8062)) +set_config( config( ssl_verifyhost = 0L, ssl_verifypeer = 0L ) ) + +builder <- DSI::newDSLoginBuilder(.silent = FALSE) +"#, + ); + + // Append each line to the script. + for line in script_lines { + builder_script.push_str(&line); + builder_script.push('\n'); + } + + // Finish the script with the login and assignment commands. + builder_script.push_str( + "logindata <- builder$build() +connections <- DSI::datashield.login(logins = logindata, assign = TRUE, symbol = 'D')\n", + ); + + builder_script +}