From e60a884ce082891fda9d6950bc81515db7e139a0 Mon Sep 17 00:00:00 2001 From: anant Date: Wed, 25 Dec 2024 00:09:16 +0530 Subject: [PATCH] Feature: Backend for Correlation --- src/cli.rs | 4 +- src/correlation/correlation_utils.rs | 77 ++++++++++ src/correlation/http_handlers.rs | 149 ++++++++++++++++++ src/correlation/mod.rs | 193 ++++++++++++++++++++++++ src/handlers/http/modal/query_server.rs | 5 + src/handlers/http/modal/server.rs | 41 +++++ src/handlers/http/query.rs | 2 +- src/lib.rs | 1 + src/rbac/role.rs | 20 +++ src/storage/mod.rs | 1 + src/storage/object_storage.rs | 31 +++- 11 files changed, 520 insertions(+), 4 deletions(-) create mode 100644 src/correlation/correlation_utils.rs create mode 100644 src/correlation/http_handlers.rs create mode 100644 src/correlation/mod.rs diff --git a/src/cli.rs b/src/cli.rs index 38648c7ad..1205fdfb2 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -529,7 +529,9 @@ impl FromArgMatches for Cli { self.kafka_host = m.get_one::(Self::KAFKA_HOST).cloned(); self.kafka_group = m.get_one::(Self::KAFKA_GROUP).cloned(); self.kafka_client_id = m.get_one::(Self::KAFKA_CLIENT_ID).cloned(); - self.kafka_security_protocol = m.get_one::(Self::KAFKA_SECURITY_PROTOCOL).cloned(); + self.kafka_security_protocol = m + .get_one::(Self::KAFKA_SECURITY_PROTOCOL) + .cloned(); self.kafka_partitions = m.get_one::(Self::KAFKA_PARTITIONS).cloned(); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); diff --git a/src/correlation/correlation_utils.rs b/src/correlation/correlation_utils.rs new file mode 100644 index 000000000..ec9d25c5c --- /dev/null +++ b/src/correlation/correlation_utils.rs @@ -0,0 +1,77 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use datafusion::common::tree_node::TreeNode; + +use crate::{ + query::{TableScanVisitor, QUERY_SESSION}, + rbac::{ + map::SessionKey, + role::{Action, Permission}, + Users, + }, +}; + +use super::CorrelationError; + +async fn get_tables_from_query(query: &str) -> Result { + let session_state = QUERY_SESSION.state(); + let raw_logical_plan = session_state + .create_logical_plan(query) + .await + .map_err(|err| CorrelationError::AnyhowError(err.into()))?; + + let mut visitor = TableScanVisitor::default(); + let _ = raw_logical_plan.visit(&mut visitor); + Ok(visitor) +} + +pub async fn user_auth_for_query( + session_key: &SessionKey, + query: &str, +) -> Result<(), CorrelationError> { + let tables = get_tables_from_query(query).await?; + let permissions = Users.get_permissions(session_key); + + for table_name in tables.into_inner().iter() { + let mut authorized = false; + + // in permission check if user can run query on the stream. + // also while iterating add any filter tags for this stream + for permission in permissions.iter() { + match permission { + Permission::Stream(Action::All, _) => { + authorized = true; + break; + } + Permission::StreamWithTag(Action::Query, ref stream, _) + if stream == table_name || stream == "*" => + { + authorized = true; + } + _ => (), + } + } + + if !authorized { + return Err(CorrelationError::Unauthorized); + } + } + + Ok(()) +} diff --git a/src/correlation/http_handlers.rs b/src/correlation/http_handlers.rs new file mode 100644 index 000000000..a3ac05359 --- /dev/null +++ b/src/correlation/http_handlers.rs @@ -0,0 +1,149 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, Responder}; +use bytes::Bytes; +use relative_path::RelativePathBuf; + +use crate::{ + option::CONFIG, + storage::{CORRELATION_DIRECTORY, PARSEABLE_ROOT_DIRECTORY}, + utils::{actix::extract_session_key_from_req, uid::Uid}, +}; + +use super::{ + correlation_utils::user_auth_for_query, CorrelationConfig, CorrelationError, + CorrelationRequest, CORRELATIONS, +}; + +pub async fn list(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlations = CORRELATIONS + .list_correlations_for_user(&session_key) + .await?; + + Ok(web::Json(correlations)) +} + +pub async fn get(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_id = req + .match_info() + .get("correlation_id") + .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; + + let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; + + if user_auth_for_query(&session_key, &correlation.query) + .await + .is_ok() + { + Ok(web::Json(correlation)) + } else { + Err(CorrelationError::Unauthorized) + } +} + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; + let correlation: CorrelationConfig = correlation_request.into(); + + // validate user's query auth + user_auth_for_query(&session_key, &correlation.query).await?; + + // Save to disk + let store = CONFIG.storage().get_object_store(); + store.put_correlation(&correlation).await?; + + // Save to memory + CORRELATIONS.update(&correlation).await?; + + Ok(format!( + "Saved correlation with ID- {}", + correlation.id.to_string() + )) +} + +pub async fn modify(req: HttpRequest, body: Bytes) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_id = req + .match_info() + .get("correlation_id") + .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; + + let correlation_request: CorrelationRequest = serde_json::from_slice(&body)?; + + // validate user's query auth + user_auth_for_query(&session_key, &correlation_request.query).await?; + + let correlation: CorrelationConfig = CorrelationConfig { + version: correlation_request.version, + id: Uid::from_string(correlation_id) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?, + query: correlation_request.query, + }; + + // Save to disk + let store = CONFIG.storage().get_object_store(); + store.put_correlation(&correlation).await?; + + // Save to memory + CORRELATIONS.update(&correlation).await?; + + Ok(format!( + "Modified correlation with ID- {}", + correlation.id.to_string() + )) +} + +pub async fn delete(req: HttpRequest) -> Result { + let session_key = extract_session_key_from_req(&req) + .map_err(|err| CorrelationError::AnyhowError(anyhow::Error::msg(err.to_string())))?; + + let correlation_id = req + .match_info() + .get("correlation_id") + .ok_or(CorrelationError::Metadata("No correlation ID Provided"))?; + + let correlation = CORRELATIONS.get_correlation_by_id(correlation_id).await?; + + // validate user's query auth + user_auth_for_query(&session_key, &correlation.query).await?; + + // Delete from disk + let store = CONFIG.storage().get_object_store(); + let path = RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + CORRELATION_DIRECTORY, + &format!("{}", correlation.id), + ]); + store.delete_object(&path).await?; + + // Delete from memory + CORRELATIONS.delete(correlation_id).await?; + Ok(format!("Deleted correlation with ID- {correlation_id}")) +} diff --git a/src/correlation/mod.rs b/src/correlation/mod.rs new file mode 100644 index 000000000..35f990ce4 --- /dev/null +++ b/src/correlation/mod.rs @@ -0,0 +1,193 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::http::header::ContentType; +use correlation_utils::user_auth_for_query; +use http::StatusCode; +use itertools::Itertools; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use serde_json::Error as SerdeError; +use tokio::sync::RwLock; +use tracing::{trace, warn}; + +use crate::{ + handlers::http::rbac::RBACError, option::CONFIG, rbac::map::SessionKey, + storage::ObjectStorageError, utils::uid::Uid, +}; + +pub mod correlation_utils; +pub mod http_handlers; + +pub static CORRELATIONS: Lazy = Lazy::new(Correlation::default); + +#[derive(Debug, Default)] +pub struct Correlation(RwLock>); + +impl Correlation { + pub async fn load(&self) -> Result<(), CorrelationError> { + // lead correlations from storage + let store = CONFIG.storage().get_object_store(); + let all_correlations = store.get_correlations().await.unwrap_or_default(); + + let mut correlations = vec![]; + for corr in all_correlations { + if corr.is_empty() { + continue; + } + + let correlation: CorrelationConfig = serde_json::from_slice(&corr)?; + + correlations.push(correlation); + } + + let mut s = self.0.write().await; + s.append(&mut correlations.clone()); + Ok(()) + } + + pub async fn list_correlations_for_user( + &self, + session_key: &SessionKey, + ) -> Result, CorrelationError> { + let correlations = self.0.read().await.iter().cloned().collect_vec(); + + let mut user_correlations = vec![]; + for c in correlations { + if user_auth_for_query(session_key, &c.query).await.is_ok() { + user_correlations.push(c); + } + } + Ok(user_correlations) + } + + pub async fn get_correlation_by_id( + &self, + correlation_id: &str, + ) -> Result { + let read = self.0.read().await; + let correlation = read + .iter() + .find(|c| c.id.to_string() == correlation_id) + .cloned(); + + if let Some(c) = correlation { + Ok(c) + } else { + Err(CorrelationError::AnyhowError(anyhow::Error::msg(format!( + "Unable to find correlation with ID- {correlation_id}" + )))) + } + } + + pub async fn update(&self, correlation: &CorrelationConfig) -> Result<(), CorrelationError> { + // save to memory + let mut s = self.0.write().await; + s.retain(|c| c.id != correlation.id); + s.push(correlation.clone()); + Ok(()) + } + + pub async fn delete(&self, correlation_id: &str) -> Result<(), CorrelationError> { + // now delete from memory + let read_access = self.0.read().await; + + let index = read_access + .iter() + .enumerate() + .find(|(_, c)| c.id.to_string() == correlation_id) + .to_owned(); + + if let Some((index, _)) = index { + // drop the read access in order to get exclusive write access + drop(read_access); + self.0.write().await.remove(index); + trace!("removed correlation from memory"); + } else { + warn!("Correlation ID- {correlation_id} not found in memory!"); + } + Ok(()) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub enum CorrelationVersion { + V1, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CorrelationConfig { + pub version: CorrelationVersion, + pub id: Uid, + pub query: String, +} + +impl CorrelationConfig {} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct CorrelationRequest { + pub version: CorrelationVersion, + pub query: String, +} + +impl From for CorrelationConfig { + fn from(val: CorrelationRequest) -> Self { + Self { + version: val.version, + id: crate::utils::uid::gen(), + query: val.query, + } + } +} + +#[derive(Debug, thiserror::Error)] +pub enum CorrelationError { + #[error("Failed to connect to storage: {0}")] + ObjectStorage(#[from] ObjectStorageError), + #[error("Serde Error: {0}")] + Serde(#[from] SerdeError), + #[error("Cannot perform this operation: {0}")] + Metadata(&'static str), + #[error("User does not exist")] + UserDoesNotExist(#[from] RBACError), + #[error("Error: {0}")] + AnyhowError(#[from] anyhow::Error), + #[error("Unauthorized")] + Unauthorized, +} + +impl actix_web::ResponseError for CorrelationError { + fn status_code(&self) -> http::StatusCode { + match self { + Self::ObjectStorage(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Serde(_) => StatusCode::BAD_REQUEST, + Self::Metadata(_) => StatusCode::BAD_REQUEST, + Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, + Self::AnyhowError(_) => StatusCode::INTERNAL_SERVER_ERROR, + Self::Unauthorized => StatusCode::BAD_REQUEST, + } + } + + fn error_response(&self) -> actix_web::HttpResponse { + actix_web::HttpResponse::build(self.status_code()) + .insert_header(ContentType::plaintext()) + .body(self.to_string()) + } +} diff --git a/src/handlers/http/modal/query_server.rs b/src/handlers/http/modal/query_server.rs index 792bb6571..32bf8e2d5 100644 --- a/src/handlers/http/modal/query_server.rs +++ b/src/handlers/http/modal/query_server.rs @@ -16,6 +16,7 @@ * */ +use crate::correlation::CORRELATIONS; use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; @@ -50,6 +51,7 @@ impl ParseableServer for QueryServer { config .service( web::scope(&base_path()) + .service(Server::get_correlation_webscope()) .service(Server::get_query_factory()) .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) @@ -94,6 +96,9 @@ impl ParseableServer for QueryServer { //create internal stream at server start create_internal_stream_if_not_exists().await?; + if let Err(e) = CORRELATIONS.load().await { + error!("{e}"); + } FILTERS.load().await?; DASHBOARDS.load().await?; // track all parquet files already in the data directory diff --git a/src/handlers/http/modal/server.rs b/src/handlers/http/modal/server.rs index 6c0ec9fd8..7646ae95e 100644 --- a/src/handlers/http/modal/server.rs +++ b/src/handlers/http/modal/server.rs @@ -17,6 +17,8 @@ */ use crate::analytics; +use crate::correlation; +use crate::correlation::CORRELATIONS; use crate::handlers; use crate::handlers::http::about; use crate::handlers::http::base_path; @@ -67,6 +69,7 @@ impl ParseableServer for Server { config .service( web::scope(&base_path()) + .service(Self::get_correlation_webscope()) .service(Self::get_query_factory()) .service(Self::get_trino_factory()) .service(Self::get_cache_webscope()) @@ -102,6 +105,9 @@ impl ParseableServer for Server { migration::run_migration(&CONFIG).await?; + if let Err(e) = CORRELATIONS.load().await { + error!("{e}"); + } FILTERS.load().await?; DASHBOARDS.load().await?; @@ -172,6 +178,41 @@ impl Server { ) } + pub fn get_correlation_webscope() -> Scope { + web::scope("/correlation") + .service( + web::resource("") + .route( + web::get() + .to(correlation::http_handlers::list) + .authorize(Action::GetCorrelation), + ) + .route( + web::post() + .to(correlation::http_handlers::post) + .authorize(Action::CreateCorrelation), + ), + ) + .service( + web::resource("/correlation/{correlation_id}") + .route( + web::get() + .to(correlation::http_handlers::get) + .authorize(Action::GetCorrelation), + ) + .route( + web::put() + .to(correlation::http_handlers::modify) + .authorize(Action::PutCorrelation), + ) + .route( + web::delete() + .to(correlation::http_handlers::delete) + .authorize(Action::DeleteCorrelation), + ), + ) + } + // get the dashboards web scope pub fn get_dashboards_webscope() -> Scope { web::scope("/dashboards") diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 27414b9d0..895032d20 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -52,7 +52,7 @@ use crate::utils::time::{TimeParseError, TimeRange}; use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage; /// Query Request through http endpoint. -#[derive(Debug, serde::Deserialize, serde::Serialize)] +#[derive(Debug, serde::Deserialize, serde::Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Query { pub query: String, diff --git a/src/lib.rs b/src/lib.rs index 7a85f54e7..5c8e09274 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,6 +22,7 @@ pub mod analytics; pub mod banner; mod catalog; mod cli; +pub mod correlation; mod event; pub mod handlers; pub mod hottier; diff --git a/src/rbac/role.rs b/src/rbac/role.rs index f94c8f171..f383345ad 100644 --- a/src/rbac/role.rs +++ b/src/rbac/role.rs @@ -62,6 +62,10 @@ pub enum Action { DeleteFilter, Login, Metrics, + GetCorrelation, + CreateCorrelation, + DeleteCorrelation, + PutCorrelation, } #[derive(Debug, Clone, PartialEq, Eq, Hash)] @@ -120,6 +124,10 @@ impl RoleBuilder { | Action::ListStream | Action::ListCluster | Action::ListClusterMetrics + | Action::CreateCorrelation + | Action::DeleteCorrelation + | Action::GetCorrelation + | Action::PutCorrelation | Action::Deleteingestor | Action::PutHotTierEnabled | Action::GetHotTierEnabled @@ -208,6 +216,10 @@ pub mod model { Action::DeleteStream, Action::ListStream, Action::GetStreamInfo, + Action::CreateCorrelation, + Action::DeleteCorrelation, + Action::GetCorrelation, + Action::PutCorrelation, Action::DetectSchema, Action::GetSchema, Action::GetStats, @@ -250,6 +262,10 @@ pub mod model { Action::PutHotTierEnabled, Action::GetHotTierEnabled, Action::DeleteHotTierEnabled, + Action::CreateCorrelation, + Action::DeleteCorrelation, + Action::GetCorrelation, + Action::PutCorrelation, Action::ListDashboard, Action::GetDashboard, Action::CreateDashboard, @@ -282,6 +298,10 @@ pub mod model { Action::GetFilter, Action::CreateFilter, Action::DeleteFilter, + Action::CreateCorrelation, + Action::DeleteCorrelation, + Action::GetCorrelation, + Action::PutCorrelation, Action::ListDashboard, Action::GetDashboard, Action::CreateDashboard, diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a018c2b1c..8275b238a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -51,6 +51,7 @@ pub const PARSEABLE_ROOT_DIRECTORY: &str = ".parseable"; pub const SCHEMA_FILE_NAME: &str = ".schema"; pub const ALERT_FILE_NAME: &str = ".alert.json"; pub const MANIFEST_FILE: &str = "manifest.json"; +pub const CORRELATION_DIRECTORY: &str = ".correlations"; /// local sync interval to move data.records to /tmp dir of that stream. /// 60 sec is a reasonable value. diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 34a1bb631..4ec34834b 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -21,10 +21,11 @@ use super::{ ObjectStoreFormat, Permisssion, StorageDir, StorageMetadata, }; use super::{ - ALERT_FILE_NAME, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, PARSEABLE_ROOT_DIRECTORY, - SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, + ALERT_FILE_NAME, CORRELATION_DIRECTORY, MANIFEST_FILE, PARSEABLE_METADATA_FILE_NAME, + PARSEABLE_ROOT_DIRECTORY, SCHEMA_FILE_NAME, STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, }; +use crate::correlation::{CorrelationConfig, CorrelationError}; use crate::handlers::http::modal::ingest_server::INGESTOR_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; @@ -631,6 +632,32 @@ pub trait ObjectStorage: Send + Sync + 'static { // pick a better name fn get_bucket_name(&self) -> String; + + async fn put_correlation( + &self, + correlation: &CorrelationConfig, + ) -> Result<(), ObjectStorageError> { + let path = RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + CORRELATION_DIRECTORY, + &format!("{}", correlation.id), + ]); + self.put_object(&path, to_bytes(correlation)).await?; + Ok(()) + } + + async fn get_correlations(&self) -> Result, CorrelationError> { + let correlation_path = + RelativePathBuf::from_iter([PARSEABLE_ROOT_DIRECTORY, CORRELATION_DIRECTORY]); + let correlation_bytes = self + .get_objects( + Some(&correlation_path), + Box::new(|file_name| file_name.ends_with(".json")), + ) + .await?; + + Ok(correlation_bytes) + } } pub async fn commit_schema_to_storage(