From 5d7cd68daa0041ced3dfda58aa35cabc48bc4aaf Mon Sep 17 00:00:00 2001 From: parmesant Date: Tue, 1 Oct 2024 15:35:57 +0530 Subject: [PATCH] chore: separate handlers for different modes (#937) Co-authored-by: Nikhil Sinha <131262146+nikhilsinhaparseable@users.noreply.github.com> --- server/src/handlers/http/cluster/mod.rs | 52 +- server/src/handlers/http/ingest.rs | 258 +------ server/src/handlers/http/logstream.rs | 627 +----------------- .../http/modal/ingest/ingester_ingest.rs | 25 + .../http/modal/ingest/ingester_logstream.rs | 119 ++++ .../http/modal/ingest/ingester_rbac.rs | 114 ++++ .../http/modal/ingest/ingester_role.rs | 22 + server/src/handlers/http/modal/ingest/mod.rs | 3 + .../src/handlers/http/modal/ingest_server.rs | 80 ++- server/src/handlers/http/modal/mod.rs | 3 + server/src/handlers/http/modal/query/mod.rs | 4 + .../http/modal/query/querier_ingest.rs | 13 + .../http/modal/query/querier_logstream.rs | 324 +++++++++ .../handlers/http/modal/query/querier_rbac.rs | 157 +++++ .../handlers/http/modal/query/querier_role.rs | 27 + .../src/handlers/http/modal/query_server.rs | 214 +++++- server/src/handlers/http/modal/server.rs | 21 +- .../handlers/http/modal/utils/ingest_utils.rs | 260 ++++++++ .../http/modal/utils/logstream_utils.rs | 380 +++++++++++ server/src/handlers/http/modal/utils/mod.rs | 3 + .../handlers/http/modal/utils/rbac_utils.rs | 20 + server/src/handlers/http/rbac.rs | 184 ++--- server/src/handlers/http/role.rs | 17 +- server/src/storage/staging.rs | 2 + 24 files changed, 1844 insertions(+), 1085 deletions(-) create mode 100644 server/src/handlers/http/modal/ingest/ingester_ingest.rs create mode 100644 server/src/handlers/http/modal/ingest/ingester_logstream.rs create mode 100644 server/src/handlers/http/modal/ingest/ingester_rbac.rs create mode 100644 server/src/handlers/http/modal/ingest/ingester_role.rs create mode 100644 server/src/handlers/http/modal/ingest/mod.rs create mode 100644 server/src/handlers/http/modal/query/mod.rs create mode 100644 server/src/handlers/http/modal/query/querier_ingest.rs create mode 100644 server/src/handlers/http/modal/query/querier_logstream.rs create mode 100644 server/src/handlers/http/modal/query/querier_rbac.rs create mode 100644 server/src/handlers/http/modal/query/querier_role.rs create mode 100644 server/src/handlers/http/modal/utils/ingest_utils.rs create mode 100644 server/src/handlers/http/modal/utils/logstream_utils.rs create mode 100644 server/src/handlers/http/modal/utils/mod.rs create mode 100644 server/src/handlers/http/modal/utils/rbac_utils.rs diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 732fae020..4e7d3219f 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -59,46 +59,6 @@ pub const INTERNAL_STREAM_NAME: &str = "pmeta"; const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1); -pub async fn sync_cache_with_ingestors( - url: &str, - ingestor: IngestorMetadata, - body: bool, -) -> Result<(), StreamError> { - if !utils::check_liveness(&ingestor.domain_name).await { - return Ok(()); - } - let request_body: Bytes = Bytes::from(body.to_string()); - let client = reqwest::Client::new(); - let resp = client - .put(url) - .header(header::CONTENT_TYPE, "application/json") - .header(header::AUTHORIZATION, ingestor.token) - .body(request_body) - .send() - .await - .map_err(|err| { - // log the error and return a custom error - log::error!( - "Fatal: failed to set cache: {}\n Error: {:?}", - ingestor.domain_name, - err - ); - StreamError::Network(err) - })?; - - // if the response is not successful, log the error and return a custom error - // this could be a bit too much, but we need to be sure it covers all cases - if !resp.status().is_success() { - log::error!( - "failed to set cache: {}\nResponse Returned: {:?}", - ingestor.domain_name, - resp.text().await - ); - } - - Ok(()) -} - // forward the create/update stream request to all ingestors to keep them in sync pub async fn sync_streams_with_ingestors( headers: HeaderMap, @@ -122,7 +82,7 @@ pub async fn sync_streams_with_ingestors( continue; } let url = format!( - "{}{}/logstream/{}", + "{}{}/logstream/{}/sync", ingestor.domain_name, base_path_without_preceding_slash(), stream_name @@ -176,7 +136,7 @@ pub async fn sync_users_with_roles_with_ingestors( continue; } let url = format!( - "{}{}/user/{}/role", + "{}{}/user/{}/role/sync", ingestor.domain_name, base_path_without_preceding_slash(), username @@ -224,7 +184,7 @@ pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), continue; } let url = format!( - "{}{}/user/{}", + "{}{}/user/{}/sync", ingestor.domain_name, base_path_without_preceding_slash(), username @@ -285,7 +245,7 @@ pub async fn sync_user_creation_with_ingestors( continue; } let url = format!( - "{}{}/user/{}", + "{}{}/user/{}/sync", ingestor.domain_name, base_path_without_preceding_slash(), username @@ -333,7 +293,7 @@ pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), continue; } let url = format!( - "{}{}/user/{}/generate-new-password", + "{}{}/user/{}/generate-new-password/sync", ingestor.domain_name, base_path_without_preceding_slash(), username @@ -389,7 +349,7 @@ pub async fn sync_role_update_with_ingestors( continue; } let url = format!( - "{}{}/role/{}", + "{}{}/role/{}/sync", ingestor.domain_name, base_path_without_preceding_slash(), name diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 9e6203018..7e36a4d8a 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -17,33 +17,30 @@ */ use super::logstream::error::{CreateStreamError, StreamError}; +use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; +use super::otel; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; -use super::{kinesis, otel}; use crate::event::{ self, error::EventError, format::{self, EventFormat}, }; -use crate::handlers::{ - LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, - STREAM_NAME_HEADER_KEY, -}; +use crate::handlers::{LOG_SOURCE_KEY, LOG_SOURCE_OTEL, STREAM_NAME_HEADER_KEY}; use crate::localcache::CacheError; use crate::metadata::error::stream_info::MetadataError; use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorageError, StreamType}; -use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; -use crate::utils::json::convert_array_to_object; +use crate::utils::header_parsing::ParseHeaderError; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_array::RecordBatch; -use arrow_schema::{Field, Schema}; +use arrow_schema::Schema; use bytes::Bytes; -use chrono::{DateTime, NaiveDateTime, Utc}; +use chrono::Utc; use http::StatusCode; use serde_json::Value; -use std::collections::{BTreeMap, HashMap}; +use std::collections::HashMap; use std::sync::Arc; // Handler for POST /api/v1/ingest @@ -142,35 +139,6 @@ pub async fn ingest_otel_logs(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - //flatten logs - if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) { - let mut json: Vec> = Vec::new(); - let log_source: String = log_source.to_str().unwrap().to_owned(); - match log_source.as_str() { - LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), - LOG_SOURCE_OTEL => { - json = otel::flatten_otel_logs(&body); - } - _ => { - log::warn!("Unknown log source: {}", log_source); - push_logs(stream_name.to_string(), req.clone(), body).await?; - } - } - for record in json.iter_mut() { - let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name.to_string(), req.clone(), body).await?; - } - } else { - push_logs(stream_name.to_string(), req, body).await?; - } - Ok(()) -} - // Handler for POST /api/v1/logstream/{logstream} // only ingests events into the specified logstream // fails if the logstream does not exist @@ -187,11 +155,6 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; - let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?; - let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?; - let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?; - let body_val: Value = serde_json::from_slice(&body)?; - let size: usize = body.len(); - let mut parsed_timestamp = Utc::now().naive_utc(); - if time_partition.is_none() { - if custom_partition.is_none() { - let size = size as u64; - create_process_record_batch( - stream_name.clone(), - req.clone(), - body_val.clone(), - static_schema_flag.clone(), - None, - parsed_timestamp, - HashMap::new(), - size, - ) - .await?; - } else { - let data = - convert_array_to_object(body_val.clone(), None, None, custom_partition.clone())?; - let custom_partition = custom_partition.unwrap(); - let custom_partition_list = custom_partition.split(',').collect::>(); - - for value in data { - let custom_partition_values = - get_custom_partition_values(&value, &custom_partition_list); - - let size = value.to_string().into_bytes().len() as u64; - create_process_record_batch( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - None, - parsed_timestamp, - custom_partition_values.clone(), - size, - ) - .await?; - } - } - } else if custom_partition.is_none() { - let data = convert_array_to_object( - body_val.clone(), - time_partition.clone(), - time_partition_limit, - None, - )?; - for value in data { - parsed_timestamp = get_parsed_timestamp(&value, &time_partition); - let size = value.to_string().into_bytes().len() as u64; - create_process_record_batch( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - time_partition.clone(), - parsed_timestamp, - HashMap::new(), - size, - ) - .await?; - } - } else { - let data = convert_array_to_object( - body_val.clone(), - time_partition.clone(), - time_partition_limit, - custom_partition.clone(), - )?; - let custom_partition = custom_partition.unwrap(); - let custom_partition_list = custom_partition.split(',').collect::>(); - - for value in data { - let custom_partition_values = - get_custom_partition_values(&value, &custom_partition_list); - - parsed_timestamp = get_parsed_timestamp(&value, &time_partition); - let size = value.to_string().into_bytes().len() as u64; - create_process_record_batch( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - time_partition.clone(), - parsed_timestamp, - custom_partition_values.clone(), - size, - ) - .await?; - } - } - - Ok(()) -} - -fn get_parsed_timestamp(body: &Value, time_partition: &Option) -> NaiveDateTime { - let body_timestamp = body.get(time_partition.clone().unwrap().to_string()); - let parsed_timestamp = body_timestamp - .unwrap() - .to_owned() - .as_str() - .unwrap() - .parse::>() - .unwrap() - .naive_utc(); - parsed_timestamp -} - -fn get_custom_partition_values( - body: &Value, - custom_partition_list: &[&str], -) -> HashMap { - let mut custom_partition_values: HashMap = HashMap::new(); - for custom_partition_field in custom_partition_list { - let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value.clone() { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - custom_partition_values -} - -#[allow(clippy::too_many_arguments)] -async fn create_process_record_batch( - stream_name: String, - req: HttpRequest, - value: Value, - static_schema_flag: Option, - time_partition: Option, - parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, - origin_size: u64, -) -> Result<(), PostError> { - let (rb, is_first_event) = get_stream_schema( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - time_partition.clone(), - )?; - event::Event { - rb, - stream_name: stream_name.clone(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values: custom_partition_values.clone(), - stream_type: StreamType::UserDefined, - } - .process() - .await?; - - Ok(()) -} - -fn get_stream_schema( - stream_name: String, - req: HttpRequest, - body: Value, - static_schema_flag: Option, - time_partition: Option, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name))? - .schema - .clone(); - into_event_batch(req, body, schema, static_schema_flag, time_partition) -} - -fn into_event_batch( - req: HttpRequest, - body: Value, - schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; - let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; - let event = format::json::Event { - data: body, - tags, - metadata, - }; - let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)?; - Ok((rb, is_first)) -} - // Check if the stream exists and create a new stream if doesn't exist pub async fn create_stream_if_not_exists( stream_name: &str, @@ -547,11 +307,9 @@ mod tests { use crate::{ event, - handlers::{PREFIX_META, PREFIX_TAGS}, + handlers::{http::modal::utils::ingest_utils::into_event_batch, PREFIX_META, PREFIX_TAGS}, }; - use super::into_event_batch; - trait TestExt { fn as_int64_arr(&self) -> &Int64Array; fn as_float64_arr(&self) -> &Float64Array; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 05df31e28..b5f53b6c8 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -17,30 +17,20 @@ */ use self::error::{CreateStreamError, StreamError}; -use super::base_path_without_preceding_slash; use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}; -use super::cluster::{ - fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, sync_streams_with_ingestors, - INTERNAL_STREAM_NAME, -}; +use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; +use super::modal::utils::logstream_utils::create_update_stream; use crate::alerts::Alerts; -use crate::handlers::{ - CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, - TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, -}; -use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; +use crate::handlers::STREAM_TYPE_KEY; +use crate::hottier::HotTierManager; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::option::{Mode, CONFIG}; -use crate::static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}; +use crate::option::CONFIG; use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; use crate::storage::StreamType; use crate::storage::{retention::Retention, LogStream, StorageDir, StreamInfo}; -use crate::{ - catalog::{self, remove_manifest_from_snapshot}, - event, stats, -}; +use crate::{catalog, event, stats}; use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; @@ -50,11 +40,9 @@ use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; use http::{HeaderName, HeaderValue}; -use itertools::Itertools; use serde_json::Value; use std::collections::HashMap; use std::fs; -use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; @@ -63,44 +51,23 @@ pub async fn delete(req: HttpRequest) -> Result { if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } - match CONFIG.parseable.mode { - Mode::Query | Mode::All => { - let objectstore = CONFIG.storage().get_object_store(); - - objectstore.delete_stream(&stream_name).await?; - let stream_dir = StorageDir::new(&stream_name); - if fs::remove_dir_all(&stream_dir.data_path).is_err() { - log::warn!( - "failed to delete local data for stream {}. Clean {} manually", - stream_name, - stream_dir.data_path.to_string_lossy() - ) - } - if let Some(hot_tier_manager) = HotTierManager::global() { - if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) { - hot_tier_manager.delete_hot_tier(&stream_name).await?; - } - } + let objectstore = CONFIG.storage().get_object_store(); - let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::from(err) - })?; - - for ingestor in ingestor_metadata { - let url = format!( - "{}{}/logstream/{}", - ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - // delete the stream - super::cluster::send_stream_delete_request(&url, ingestor.clone()).await?; - } + objectstore.delete_stream(&stream_name).await?; + let stream_dir = StorageDir::new(&stream_name); + if fs::remove_dir_all(&stream_dir.data_path).is_err() { + log::warn!( + "failed to delete local data for stream {}. Clean {} manually", + stream_name, + stream_dir.data_path.to_string_lossy() + ) + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) { + hot_tier_manager.delete_hot_tier(&stream_name).await?; } - _ => {} } metadata::STREAM_INFO.delete_stream(&stream_name); @@ -112,28 +79,6 @@ pub async fn delete(req: HttpRequest) -> Result { Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) } -pub async fn retention_cleanup( - req: HttpRequest, - body: Bytes, -) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let storage = CONFIG.storage().get_object_store(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - let date_list: Vec = serde_json::from_slice(&body).unwrap(); - let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; - let mut first_event_at: Option = None; - if let Err(err) = res { - log::error!("Failed to update manifest list in the snapshot {err:?}") - } else { - first_event_at = res.unwrap(); - } - - Ok((first_event_at, StatusCode::OK)) -} - pub async fn list(_: HttpRequest) -> impl Responder { let res: Vec = STREAM_INFO .list_streams() @@ -186,224 +131,11 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if CONFIG.parseable.mode == Mode::Query { - let headers = create_update_stream(&req, &body, &stream_name).await?; - sync_streams_with_ingestors(headers, body, &stream_name).await?; - } else { - create_update_stream(&req, &body, &stream_name).await?; - } + create_update_stream(&req, &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } -fn fetch_headers_from_put_stream_request( - req: &HttpRequest, -) -> (String, String, String, String, String, String) { - let mut time_partition = String::default(); - let mut time_partition_limit = String::default(); - let mut custom_partition = String::default(); - let mut static_schema_flag = String::default(); - let mut update_stream = String::default(); - let mut stream_type = StreamType::UserDefined.to_string(); - req.headers().iter().for_each(|(key, value)| { - if key == TIME_PARTITION_KEY { - time_partition = value.to_str().unwrap().to_string(); - } - if key == TIME_PARTITION_LIMIT_KEY { - time_partition_limit = value.to_str().unwrap().to_string(); - } - if key == CUSTOM_PARTITION_KEY { - custom_partition = value.to_str().unwrap().to_string(); - } - if key == STATIC_SCHEMA_FLAG { - static_schema_flag = value.to_str().unwrap().to_string(); - } - if key == UPDATE_STREAM_KEY { - update_stream = value.to_str().unwrap().to_string(); - } - if key == STREAM_TYPE_KEY { - stream_type = value.to_str().unwrap().to_string(); - } - }); - - ( - time_partition, - time_partition_limit, - custom_partition, - static_schema_flag, - update_stream, - stream_type, - ) -} - -fn validate_time_partition_limit(time_partition_limit: &str) -> Result<&str, CreateStreamError> { - if !time_partition_limit.ends_with('d') { - return Err(CreateStreamError::Custom { - msg: "Missing 'd' suffix for duration value".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let days = &time_partition_limit[0..time_partition_limit.len() - 1]; - if days.parse::().is_err() { - return Err(CreateStreamError::Custom { - msg: "Could not convert duration to an unsigned number".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - Ok(days) -} - -fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> { - let custom_partition_list = custom_partition.split(',').collect::>(); - if custom_partition_list.len() > 3 { - return Err(CreateStreamError::Custom { - msg: "Maximum 3 custom partition keys are supported".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - Ok(()) -} - -fn validate_time_with_custom_partition( - time_partition: &str, - custom_partition: &str, -) -> Result<(), CreateStreamError> { - let custom_partition_list = custom_partition.split(',').collect::>(); - if custom_partition_list.contains(&time_partition) { - return Err(CreateStreamError::Custom { - msg: format!( - "time partition {} cannot be set as custom partition", - time_partition - ), - status: StatusCode::BAD_REQUEST, - }); - } - Ok(()) -} - -fn validate_static_schema( - body: &Bytes, - stream_name: &str, - time_partition: &str, - custom_partition: &str, - static_schema_flag: &str, -) -> Result, CreateStreamError> { - if static_schema_flag == "true" { - if body.is_empty() { - return Err(CreateStreamError::Custom { - msg: format!( - "Please provide schema in the request body for static schema logstream {stream_name}" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - let static_schema: StaticSchema = serde_json::from_slice(body)?; - let parsed_schema = - convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition) - .map_err(|_| CreateStreamError::Custom { - msg: format!( - "Unable to commit static schema, logstream {stream_name} not created" - ), - status: StatusCode::BAD_REQUEST, - })?; - - return Ok(parsed_schema); - } - - Ok(Arc::new(Schema::empty())) -} - -async fn create_update_stream( - req: &HttpRequest, - body: &Bytes, - stream_name: &str, -) -> Result { - let ( - time_partition, - time_partition_limit, - custom_partition, - static_schema_flag, - update_stream, - stream_type, - ) = fetch_headers_from_put_stream_request(req); - - if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { - return Err(StreamError::Custom { - msg: format!( - "Logstream {stream_name} already exists, please create a new log stream with unique name" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - if update_stream == "true" { - if !STREAM_INFO.stream_exists(stream_name) { - return Err(StreamError::StreamNotFound(stream_name.to_string())); - } - if !time_partition.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the time partition of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !static_schema_flag.is_empty() { - return Err(StreamError::Custom { - msg: "Altering the schema of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; - update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) - .await?; - return Ok(req.headers().clone()); - } - - if !custom_partition.is_empty() { - validate_custom_partition(&custom_partition)?; - update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; - } else { - update_custom_partition_in_stream(stream_name.to_string(), "").await?; - } - return Ok(req.headers().clone()); - } - let mut time_partition_in_days = ""; - if !time_partition_limit.is_empty() { - time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?; - } - if !custom_partition.is_empty() { - validate_custom_partition(&custom_partition)?; - } - - if !time_partition.is_empty() && !custom_partition.is_empty() { - validate_time_with_custom_partition(&time_partition, &custom_partition)?; - } - - let schema = validate_static_schema( - body, - stream_name, - &time_partition, - &custom_partition, - &static_schema_flag, - )?; - - create_stream( - stream_name.to_string(), - &time_partition, - time_partition_in_days, - &custom_partition, - &static_schema_flag, - schema, - &stream_type, - ) - .await?; - - Ok(req.headers().clone()) -} pub async fn put_alert( req: HttpRequest, body: web::Json, @@ -511,13 +243,8 @@ pub async fn put_retention( pub async fn get_cache_enabled(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - match CONFIG.parseable.mode { - Mode::Ingest | Mode::All => { - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - } - _ => {} + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); } let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; @@ -531,61 +258,11 @@ pub async fn put_enable_cache( let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let storage = CONFIG.storage().get_object_store(); - match CONFIG.parseable.mode { - Mode::Query => { - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - let ingestor_metadata = super::cluster::get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::from(err) - })?; - for ingestor in ingestor_metadata { - let url = format!( - "{}{}/logstream/{}/cache", - ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - super::cluster::sync_cache_with_ingestors(&url, ingestor.clone(), *body).await?; - } - } - Mode::Ingest => { - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - // here the ingest server has not found the stream - // so it should check if the stream exists in storage - let check = storage - .list_streams() - .await? - .iter() - .map(|stream| stream.name.clone()) - .contains(&stream_name); - - if !check { - log::error!("Stream {} not found", stream_name.clone()); - return Err(StreamError::StreamNotFound(stream_name.clone())); - } - metadata::STREAM_INFO - .upsert_stream_info( - &*storage, - LogStream { - name: stream_name.clone().to_owned(), - }, - ) - .await - .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; - } - Mode::All => { - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - if CONFIG.parseable.local_cache_path.is_none() { - return Err(StreamError::CacheNotEnabled(stream_name)); - } - } + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); } let enable_cache = body.into_inner(); let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; @@ -600,6 +277,7 @@ pub async fn put_enable_cache( StatusCode::OK, )) } + pub async fn get_stats_date(stream_name: &str, date: &str) -> Result { let event_labels = event_labels_date(stream_name, "json", date); let storage_size_labels = storage_size_labels_date(stream_name, date); @@ -643,38 +321,16 @@ pub async fn get_stats(req: HttpRequest) -> Result } if !date_value.is_empty() { - if CONFIG.parseable.mode == Mode::Query { - let querier_stats = get_stats_date(&stream_name, date_value).await?; - let ingestor_stats = - fetch_daily_stats_from_ingestors(&stream_name, date_value).await?; - let total_stats = Stats { - events: querier_stats.events + ingestor_stats.events, - ingestion: querier_stats.ingestion + ingestor_stats.ingestion, - storage: querier_stats.storage + ingestor_stats.storage, - }; - let stats = serde_json::to_value(total_stats)?; - - return Ok((web::Json(stats), StatusCode::OK)); - } else { - let stats = get_stats_date(&stream_name, date_value).await?; - let stats = serde_json::to_value(stats)?; - - return Ok((web::Json(stats), StatusCode::OK)); - } + let stats = get_stats_date(&stream_name, date_value).await?; + let stats = serde_json::to_value(stats)?; + return Ok((web::Json(stats), StatusCode::OK)); } } let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats = if CONFIG.parseable.mode == Mode::Query - && STREAM_INFO.stream_type(&stream_name).unwrap() - == Some(StreamType::UserDefined.to_string()) - { - Some(fetch_stats_from_ingestors(&stream_name).await?) - } else { - None - }; + let ingestor_stats: Option> = None; let hash_map = STREAM_INFO.read().expect("Readable"); let stream_meta = &hash_map @@ -759,99 +415,6 @@ fn remove_id_from_alerts(value: &mut Value) { } } -pub async fn update_time_partition_limit_in_stream( - stream_name: String, - time_partition_limit: &str, -) -> Result<(), CreateStreamError> { - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage - .update_time_partition_limit_in_stream(&stream_name, time_partition_limit) - .await - { - return Err(CreateStreamError::Storage { stream_name, err }); - } - - if metadata::STREAM_INFO - .update_time_partition_limit(&stream_name, time_partition_limit.to_string()) - .is_err() - { - return Err(CreateStreamError::Custom { - msg: "failed to update time partition limit in metadata".to_string(), - status: StatusCode::EXPECTATION_FAILED, - }); - } - - Ok(()) -} - -pub async fn update_custom_partition_in_stream( - stream_name: String, - custom_partition: &str, -) -> Result<(), CreateStreamError> { - let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap(); - let time_partition = STREAM_INFO.get_time_partition(&stream_name).unwrap(); - if static_schema_flag.is_some() { - let schema = STREAM_INFO.schema(&stream_name).unwrap(); - - if !custom_partition.is_empty() { - let custom_partition_list = custom_partition.split(',').collect::>(); - let custom_partition_exists: HashMap<_, _> = custom_partition_list - .iter() - .map(|&partition| { - ( - partition.to_string(), - schema - .fields() - .iter() - .any(|field| field.name() == partition), - ) - }) - .collect(); - - for partition in &custom_partition_list { - if !custom_partition_exists[*partition] { - return Err(CreateStreamError::Custom { - msg: format!("custom partition field {} does not exist in the schema for the stream {}", partition, stream_name), - status: StatusCode::BAD_REQUEST, - }); - } - - if let Some(time_partition) = time_partition.clone() { - if time_partition == *partition { - return Err(CreateStreamError::Custom { - msg: format!( - "time partition {} cannot be set as custom partition", - partition - ), - status: StatusCode::BAD_REQUEST, - }); - } - } - } - } - } - - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage - .update_custom_partition_in_stream(&stream_name, custom_partition) - .await - { - return Err(CreateStreamError::Storage { stream_name, err }); - } - - if metadata::STREAM_INFO - .update_custom_partition(&stream_name, custom_partition.to_string()) - .is_err() - { - return Err(CreateStreamError::Custom { - msg: "failed to update custom partition in metadata".to_string(), - status: StatusCode::EXPECTATION_FAILED, - }); - } - - Ok(()) -} - pub async fn create_stream( stream_name: String, time_partition: &str, @@ -949,130 +512,6 @@ pub async fn get_stream_info(req: HttpRequest) -> Result, -) -> Result { - if CONFIG.parseable.mode != Mode::Query { - return Err(StreamError::Custom { - msg: "Hot tier can only be enabled in query mode".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { - return Err(StreamError::Custom { - msg: "Hot tier can not be updated for internal stream".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - if CONFIG.parseable.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - let body = body.into_inner(); - let mut hottier: StreamHotTier = match serde_json::from_value(body) { - Ok(hottier) => hottier, - Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), - }; - - validator::hot_tier(&hottier.size.to_string())?; - - STREAM_INFO.set_hot_tier(&stream_name, true)?; - if let Some(hot_tier_manager) = HotTierManager::global() { - let existing_hot_tier_used_size = hot_tier_manager - .validate_hot_tier_size(&stream_name, &hottier.size) - .await?; - hottier.used_size = Some(existing_hot_tier_used_size.to_string()); - hottier.available_size = Some(hottier.size.clone()); - hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); - hot_tier_manager - .put_hot_tier(&stream_name, &mut hottier) - .await?; - let storage = CONFIG.storage().get_object_store(); - let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; - stream_metadata.hot_tier_enabled = Some(true); - storage - .put_stream_manifest(&stream_name, &stream_metadata) - .await?; - } - - Ok(( - format!("hot tier set for stream {stream_name}"), - StatusCode::OK, - )) -} - -pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { - if CONFIG.parseable.mode != Mode::Query { - return Err(StreamError::Custom { - msg: "Hot tier can only be enabled in query mode".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - if CONFIG.parseable.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - if let Some(hot_tier_manager) = HotTierManager::global() { - let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?; - hot_tier.size = format!("{} {}", hot_tier.size, "Bytes"); - hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes")); - hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes")); - Ok((web::Json(hot_tier), StatusCode::OK)) - } else { - Err(StreamError::Custom { - msg: format!("hot tier not initialised for stream {}", stream_name), - status: (StatusCode::BAD_REQUEST), - }) - } -} - -pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result { - if CONFIG.parseable.mode != Mode::Query { - return Err(StreamError::Custom { - msg: "Hot tier can only be enabled in query mode".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - - if !metadata::STREAM_INFO.stream_exists(&stream_name) { - return Err(StreamError::StreamNotFound(stream_name)); - } - - if CONFIG.parseable.hot_tier_storage_path.is_none() { - return Err(StreamError::HotTierNotEnabled(stream_name)); - } - - if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { - return Err(StreamError::Custom { - msg: "Hot tier can not be deleted for internal stream".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - - if let Some(hot_tier_manager) = HotTierManager::global() { - hot_tier_manager.delete_hot_tier(&stream_name).await?; - } - Ok(( - format!("hot tier deleted for stream {stream_name}"), - StatusCode::OK, - )) -} - pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> { if let Ok(stream_exists) = create_stream_if_not_exists(INTERNAL_STREAM_NAME, &StreamType::Internal.to_string()).await diff --git a/server/src/handlers/http/modal/ingest/ingester_ingest.rs b/server/src/handlers/http/modal/ingest/ingester_ingest.rs new file mode 100644 index 000000000..f7725254a --- /dev/null +++ b/server/src/handlers/http/modal/ingest/ingester_ingest.rs @@ -0,0 +1,25 @@ +use actix_web::{HttpRequest, HttpResponse}; +use bytes::Bytes; + +use crate::{handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs}, metadata::STREAM_INFO}; + + +// Handler for POST /api/v1/logstream/{logstream} +// only ingests events into the specified logstream +// fails if the logstream does not exist +pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let internal_stream_names = STREAM_INFO.list_internal_streams(); + if internal_stream_names.contains(&stream_name) { + return Err(PostError::Invalid(anyhow::anyhow!( + "Stream {} is an internal stream and cannot be ingested into", + stream_name + ))); + } + if !STREAM_INFO.stream_exists(&stream_name) { + return Err(PostError::StreamNotFound(stream_name)); + } + + flatten_and_push_logs(req, body, stream_name).await?; + Ok(HttpResponse::Ok().finish()) +} \ No newline at end of file diff --git a/server/src/handlers/http/modal/ingest/ingester_logstream.rs b/server/src/handlers/http/modal/ingest/ingester_logstream.rs new file mode 100644 index 000000000..f5ece7487 --- /dev/null +++ b/server/src/handlers/http/modal/ingest/ingester_logstream.rs @@ -0,0 +1,119 @@ +use actix_web::{web, HttpRequest, Responder}; +use bytes::Bytes; +use http::StatusCode; +use itertools::Itertools; + +use crate::{ + catalog::remove_manifest_from_snapshot, + event, + handlers::http::{ + logstream::error::StreamError, modal::utils::logstream_utils::create_update_stream, + }, + metadata::{self, STREAM_INFO}, + option::CONFIG, + stats, + storage::LogStream, +}; + +pub async fn retention_cleanup( + req: HttpRequest, + body: Bytes, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let storage = CONFIG.storage().get_object_store(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + let date_list: Vec = serde_json::from_slice(&body).unwrap(); + let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await; + let mut first_event_at: Option = None; + if let Err(err) = res { + log::error!("Failed to update manifest list in the snapshot {err:?}") + } else { + first_event_at = res.unwrap(); + } + + Ok((first_event_at, StatusCode::OK)) +} + +pub async fn delete(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + metadata::STREAM_INFO.delete_stream(&stream_name); + event::STREAM_WRITERS.delete_stream(&stream_name); + stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { + log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) + }); + + Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) +} + +pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + create_update_stream(&req, &body, &stream_name).await?; + + Ok(("Log stream created", StatusCode::OK)) +} + +pub async fn put_enable_cache( + req: HttpRequest, + body: web::Json, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let storage = CONFIG.storage().get_object_store(); + + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + // here the ingest server has not found the stream + // so it should check if the stream exists in storage + let check = storage + .list_streams() + .await? + .iter() + .map(|stream| stream.name.clone()) + .contains(&stream_name); + + if !check { + log::error!("Stream {} not found", stream_name.clone()); + return Err(StreamError::StreamNotFound(stream_name.clone())); + } + metadata::STREAM_INFO + .upsert_stream_info( + &*storage, + LogStream { + name: stream_name.clone().to_owned(), + }, + ) + .await + .map_err(|_| StreamError::StreamNotFound(stream_name.clone()))?; + + let enable_cache = body.into_inner(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.cache_enabled = enable_cache; + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + + STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?; + Ok(( + format!("Cache set to {enable_cache} for log stream {stream_name}"), + StatusCode::OK, + )) +} + +pub async fn get_cache_enabled(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if CONFIG.parseable.local_cache_path.is_none() { + return Err(StreamError::CacheNotEnabled(stream_name)); + } + + let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; + Ok((web::Json(cache_enabled), StatusCode::OK)) +} diff --git a/server/src/handlers/http/modal/ingest/ingester_rbac.rs b/server/src/handlers/http/modal/ingest/ingester_rbac.rs new file mode 100644 index 000000000..157b52959 --- /dev/null +++ b/server/src/handlers/http/modal/ingest/ingester_rbac.rs @@ -0,0 +1,114 @@ +use std::collections::HashSet; + +use actix_web::{web, Responder}; +use tokio::sync::Mutex; + +use crate::{ + handlers::http::{modal::utils::rbac_utils::get_metadata, rbac::RBACError}, + rbac::{ + user::{self, User as ParseableUser}, + Users, + }, + storage, +}; + +// async aware lock for updating storage metadata and user map atomicically +static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); + +// Handler for POST /api/v1/user/{username} +// Creates a new user by username if it does not exists +pub async fn post_user( + username: web::Path, + body: Option>, +) -> Result { + let username = username.into_inner(); + + let generated_password = String::default(); + let metadata = get_metadata().await?; + if let Some(body) = body { + let user: ParseableUser = serde_json::from_value(body.into_inner())?; + let _ = storage::put_staging_metadata(&metadata); + let created_role = user.roles.clone(); + Users.put_user(user.clone()); + Users.put_role(&username, created_role.clone()); + } + + Ok(generated_password) +} + +// Handler for DELETE /api/v1/user/delete/{username} +pub async fn delete_user(username: web::Path) -> Result { + let username = username.into_inner(); + let _ = UPDATE_LOCK.lock().await; + // fail this request if the user does not exists + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // delete from parseable.json first + let mut metadata = get_metadata().await?; + metadata.users.retain(|user| user.username() != username); + + let _ = storage::put_staging_metadata(&metadata); + + // update in mem table + Users.delete_user(&username); + Ok(format!("deleted user: {username}")) +} + +// Handler PUT /user/{username}/roles => Put roles for user +// Put roles for given user +pub async fn put_role( + username: web::Path, + role: web::Json>, +) -> Result { + let username = username.into_inner(); + let role = role.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.clone_from(&role); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + let _ = storage::put_staging_metadata(&metadata); + // update in mem table + Users.put_role(&username.clone(), role.clone()); + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler for POST /api/v1/user/{username}/generate-new-password +// Resets password for the user to a newly generated one and returns it +pub async fn post_gen_password(username: web::Path) -> Result { + let username = username.into_inner(); + let mut new_hash = String::default(); + let mut metadata = get_metadata().await?; + + let _ = storage::put_staging_metadata(&metadata); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + new_hash.clone_from(&user.password_hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + Users.change_password_hash(&username, &new_hash); + + Ok("Updated") +} diff --git a/server/src/handlers/http/modal/ingest/ingester_role.rs b/server/src/handlers/http/modal/ingest/ingester_role.rs new file mode 100644 index 000000000..0ad41e765 --- /dev/null +++ b/server/src/handlers/http/modal/ingest/ingester_role.rs @@ -0,0 +1,22 @@ +use actix_web::{web, HttpResponse, Responder}; +use bytes::Bytes; + +use crate::{ + handlers::http::{modal::utils::rbac_utils::get_metadata, role::RoleError}, + rbac::{map::mut_roles, role::model::DefaultPrivilege}, + storage, +}; + +// Handler for PUT /api/v1/role/{name} +// Creates a new role or update existing one +pub async fn put(name: web::Path, body: Bytes) -> Result { + let name = name.into_inner(); + let privileges = serde_json::from_slice::>(&body)?; + let mut metadata = get_metadata().await?; + metadata.roles.insert(name.clone(), privileges.clone()); + + let _ = storage::put_staging_metadata(&metadata); + mut_roles().insert(name.clone(), privileges.clone()); + + Ok(HttpResponse::Ok().finish()) +} diff --git a/server/src/handlers/http/modal/ingest/mod.rs b/server/src/handlers/http/modal/ingest/mod.rs new file mode 100644 index 000000000..26ed76438 --- /dev/null +++ b/server/src/handlers/http/modal/ingest/mod.rs @@ -0,0 +1,3 @@ +pub mod ingester_logstream; +pub mod ingester_rbac; +pub mod ingester_role; diff --git a/server/src/handlers/http/modal/ingest_server.rs b/server/src/handlers/http/modal/ingest_server.rs index 895fb8b8a..23a0dcc55 100644 --- a/server/src/handlers/http/modal/ingest_server.rs +++ b/server/src/handlers/http/modal/ingest_server.rs @@ -18,12 +18,12 @@ use crate::analytics; use crate::banner; use crate::handlers::airplane; -use crate::handlers::http; use crate::handlers::http::health_check; use crate::handlers::http::ingest; use crate::handlers::http::logstream; use crate::handlers::http::middleware::DisAllowRootUser; use crate::handlers::http::middleware::RouteExt; +use crate::handlers::http::role; use crate::localcache::LocalCacheManager; use crate::metrics; use crate::migration; @@ -40,6 +40,9 @@ use crate::sync; use std::sync::Arc; +use super::ingest::ingester_logstream; +use super::ingest::ingester_rbac; +use super::ingest::ingester_role; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::IngestorMetadata; @@ -51,6 +54,7 @@ use crate::{ option::CONFIG, }; use actix_web::body::MessageBody; +use actix_web::web::resource; use actix_web::Scope; use actix_web::{web, App, HttpServer}; use actix_web_prometheus::PrometheusMetrics; @@ -185,7 +189,7 @@ impl IngestServer { .service(Self::analytics_factory()) .service(Server::get_liveness_factory()) .service(Self::get_user_webscope()) - .service(Server::get_user_role_webscope()) + .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) .service(Server::get_readiness_factory()), ) @@ -202,41 +206,64 @@ impl IngestServer { ), ) } + + // get the role webscope + fn get_user_role_webscope() -> Scope { + web::scope("/role") + // GET Role List + .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service( + // PUT and GET Default Role + resource("/default") + .route(web::put().to(role::put_default).authorize(Action::PutRole)) + .route(web::get().to(role::get_default).authorize(Action::GetRole)), + ) + .service( + // PUT, GET, DELETE Roles + resource("/{name}") + .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route(web::get().to(role::get).authorize(Action::GetRole)), + ) + .service( + resource("/{name}/sync") + .route(web::put().to(ingester_role::put).authorize(Action::PutRole)), + ) + } // get the user webscope fn get_user_webscope() -> Scope { web::scope("/user") .service( - web::resource("/{username}") - // PUT /user/{username} => Create a new user + web::resource("/{username}/sync") + // PUT /user/{username}/sync => Sync creation of a new user .route( web::post() - .to(http::rbac::post_user) + .to(ingester_rbac::post_user) .authorize(Action::PutUser), ) - // DELETE /user/{username} => Delete a user + // DELETE /user/{username} => Sync deletion of a user .route( web::delete() - .to(http::rbac::delete_user) + .to(ingester_rbac::delete_user) .authorize(Action::DeleteUser), ) .wrap(DisAllowRootUser), ) .service( - web::resource("/{username}/role") + web::resource("/{username}/role/sync") // PUT /user/{username}/roles => Put roles for user .route( web::put() - .to(http::rbac::put_role) + .to(ingester_rbac::put_role) .authorize(Action::PutUserRoles) .wrap(DisAllowRootUser), ), ) .service( - web::resource("/{username}/generate-new-password") + web::resource("/{username}/generate-new-password/sync") // POST /user/{username}/generate-new-password => reset password for this user .route( web::post() - .to(http::rbac::post_gen_password) + .to(ingester_rbac::post_gen_password) .authorize(Action::PutUser) .wrap(DisAllowRootUser), ), @@ -247,23 +274,26 @@ impl IngestServer { web::scope("/{logstream}") .service( web::resource("") - // DELETE "/logstream/{logstream}" ==> Delete a log stream + // POST "/logstream/{logstream}" ==> Post logs to given log stream + .route( + web::post() + .to(ingest::post_event) + .authorize_for_stream(Action::Ingest), + ), + ) + .service( + web::resource("/sync") + // DELETE "/logstream/{logstream}/sync" ==> Sync deletion of a log stream .route( web::delete() - .to(logstream::delete) - .authorize_for_stream(Action::DeleteStream), + .to(ingester_logstream::delete) + .authorize(Action::DeleteStream), ) - // PUT "/logstream/{logstream}" ==> Create a new log stream + // PUT "/logstream/{logstream}/sync" ==> Sync creation of a new log stream .route( web::put() - .to(logstream::put_stream) + .to(ingester_logstream::put_stream) .authorize_for_stream(Action::CreateStream), - ) - // POST "/logstream/{logstream}" ==> Post logs to given log stream - .route( - web::post() - .to(ingest::post_event) - .authorize_for_stream(Action::Ingest), ), ) .service( @@ -287,13 +317,13 @@ impl IngestServer { // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream .route( web::put() - .to(logstream::put_enable_cache) + .to(ingester_logstream::put_enable_cache) .authorize_for_stream(Action::PutCacheEnabled), ) // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream .route( web::get() - .to(logstream::get_cache_enabled) + .to(ingester_logstream::get_cache_enabled) .authorize_for_stream(Action::GetCacheEnabled), ), ) @@ -301,7 +331,7 @@ impl IngestServer { web::scope("/retention").service( web::resource("/cleanup").route( web::post() - .to(logstream::retention_cleanup) + .to(ingester_logstream::retention_cleanup) .authorize_for_stream(Action::PutRetention), ), ), diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index 8af1119e3..6f6d2bfd7 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -16,10 +16,13 @@ * */ +pub mod ingest; pub mod ingest_server; +pub mod query; pub mod query_server; pub mod server; pub mod ssl_acceptor; +pub mod utils; use std::sync::Arc; diff --git a/server/src/handlers/http/modal/query/mod.rs b/server/src/handlers/http/modal/query/mod.rs new file mode 100644 index 000000000..704f9ca54 --- /dev/null +++ b/server/src/handlers/http/modal/query/mod.rs @@ -0,0 +1,4 @@ +pub mod querier_ingest; +pub mod querier_logstream; +pub mod querier_rbac; +pub mod querier_role; diff --git a/server/src/handlers/http/modal/query/querier_ingest.rs b/server/src/handlers/http/modal/query/querier_ingest.rs new file mode 100644 index 000000000..2e5e140c6 --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_ingest.rs @@ -0,0 +1,13 @@ +use crate::handlers::http::ingest::PostError; +use actix_web::{HttpRequest, HttpResponse}; +use bytes::Bytes; + +// Handler for POST /api/v1/logstream/{logstream} +// only ingests events into the specified logstream +// fails if the logstream does not exist +#[allow(unused)] +pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { + Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion is not allowed in Query mode" + ))) +} diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs new file mode 100644 index 000000000..bbac2c157 --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -0,0 +1,324 @@ +use std::fs; + +use actix_web::{web, HttpRequest, Responder}; +use bytes::Bytes; +use chrono::Utc; +use http::StatusCode; + +use crate::{ + event, + handlers::http::{ + base_path_without_preceding_slash, + cluster::{ + self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, + sync_streams_with_ingestors, + utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, + }, + logstream::{error::StreamError, get_stats_date}, + modal::utils::logstream_utils::create_update_stream, + }, + hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}, + metadata::{self, STREAM_INFO}, + option::CONFIG, + stats::{self, Stats}, + storage::{StorageDir, StreamType}, + validator, +}; + +pub async fn delete(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + let objectstore = CONFIG.storage().get_object_store(); + + objectstore.delete_stream(&stream_name).await?; + let stream_dir = StorageDir::new(&stream_name); + if fs::remove_dir_all(&stream_dir.data_path).is_err() { + log::warn!( + "failed to delete local data for stream {}. Clean {} manually", + stream_name, + stream_dir.data_path.to_string_lossy() + ) + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) { + hot_tier_manager.delete_hot_tier(&stream_name).await?; + } + } + + let ingestor_metadata = cluster::get_ingestor_info().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::from(err) + })?; + + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}/sync", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + // delete the stream + cluster::send_stream_delete_request(&url, ingestor.clone()).await?; + } + + metadata::STREAM_INFO.delete_stream(&stream_name); + event::STREAM_WRITERS.delete_stream(&stream_name); + stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { + log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) + }); + + Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) +} + +pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + let headers = create_update_stream(&req, &body, &stream_name).await?; + sync_streams_with_ingestors(headers, body, &stream_name).await?; + + Ok(("Log stream created", StatusCode::OK)) +} + +pub async fn get_stats(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + let query_string = req.query_string(); + if !query_string.is_empty() { + let date_key = query_string.split('=').collect::>()[0]; + let date_value = query_string.split('=').collect::>()[1]; + if date_key != "date" { + return Err(StreamError::Custom { + msg: "Invalid query parameter".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if !date_value.is_empty() { + let querier_stats = get_stats_date(&stream_name, date_value).await?; + let ingestor_stats = fetch_daily_stats_from_ingestors(&stream_name, date_value).await?; + let total_stats = Stats { + events: querier_stats.events + ingestor_stats.events, + ingestion: querier_stats.ingestion + ingestor_stats.ingestion, + storage: querier_stats.storage + ingestor_stats.storage, + }; + let stats = serde_json::to_value(total_stats)?; + + return Ok((web::Json(stats), StatusCode::OK)); + } + } + + let stats = stats::get_current_stats(&stream_name, "json") + .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + + let ingestor_stats = if STREAM_INFO.stream_type(&stream_name).unwrap() + == Some(StreamType::UserDefined.to_string()) + { + Some(fetch_stats_from_ingestors(&stream_name).await?) + } else { + None + }; + + let hash_map = STREAM_INFO.read().expect("Readable"); + let stream_meta = &hash_map + .get(&stream_name) + .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + + let time = Utc::now(); + + let stats = match &stream_meta.first_event_at { + Some(_) => { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); + + QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) + } + + None => { + let ingestion_stats = IngestionStats::new( + stats.current_stats.events, + format!("{} {}", stats.current_stats.ingestion, "Bytes"), + stats.lifetime_stats.events, + format!("{} {}", stats.lifetime_stats.ingestion, "Bytes"), + stats.deleted_stats.events, + format!("{} {}", stats.deleted_stats.ingestion, "Bytes"), + "json", + ); + let storage_stats = StorageStats::new( + format!("{} {}", stats.current_stats.storage, "Bytes"), + format!("{} {}", stats.lifetime_stats.storage, "Bytes"), + format!("{} {}", stats.deleted_stats.storage, "Bytes"), + "parquet", + ); + + QueriedStats::new(&stream_name, time, ingestion_stats, storage_stats) + } + }; + let stats = if let Some(mut ingestor_stats) = ingestor_stats { + ingestor_stats.push(stats); + merge_quried_stats(ingestor_stats) + } else { + stats + }; + + let stats = serde_json::to_value(stats)?; + + Ok((web::Json(stats), StatusCode::OK)) +} + +pub async fn put_enable_cache( + req: HttpRequest, + body: web::Json, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let storage = CONFIG.storage().get_object_store(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + let enable_cache = body.into_inner(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.cache_enabled = enable_cache; + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + + STREAM_INFO.set_cache_enabled(&stream_name, enable_cache)?; + Ok(( + format!("Cache set to {enable_cache} for log stream {stream_name}"), + StatusCode::OK, + )) +} + +pub async fn get_cache_enabled(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + let cache_enabled = STREAM_INFO.get_cache_enabled(&stream_name)?; + Ok((web::Json(cache_enabled), StatusCode::OK)) +} + +pub async fn put_stream_hot_tier( + req: HttpRequest, + body: web::Json, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { + return Err(StreamError::Custom { + msg: "Hot tier can not be updated for internal stream".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + let body = body.into_inner(); + let mut hottier: StreamHotTier = match serde_json::from_value(body) { + Ok(hottier) => hottier, + Err(err) => return Err(StreamError::InvalidHotTierConfig(err)), + }; + + validator::hot_tier(&hottier.size.to_string())?; + + STREAM_INFO.set_hot_tier(&stream_name, true)?; + if let Some(hot_tier_manager) = HotTierManager::global() { + let existing_hot_tier_used_size = hot_tier_manager + .validate_hot_tier_size(&stream_name, &hottier.size) + .await?; + hottier.used_size = Some(existing_hot_tier_used_size.to_string()); + hottier.available_size = Some(hottier.size.clone()); + hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); + hot_tier_manager + .put_hot_tier(&stream_name, &mut hottier) + .await?; + let storage = CONFIG.storage().get_object_store(); + let mut stream_metadata = storage.get_object_store_format(&stream_name).await?; + stream_metadata.hot_tier_enabled = Some(true); + storage + .put_stream_manifest(&stream_name, &stream_metadata) + .await?; + } + + Ok(( + format!("hot tier set for stream {stream_name}"), + StatusCode::OK, + )) +} + +pub async fn get_stream_hot_tier(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?; + hot_tier.size = format!("{} {}", hot_tier.size, "Bytes"); + hot_tier.used_size = Some(format!("{} {}", hot_tier.used_size.unwrap(), "Bytes")); + hot_tier.available_size = Some(format!("{} {}", hot_tier.available_size.unwrap(), "Bytes")); + Ok((web::Json(hot_tier), StatusCode::OK)) + } else { + Err(StreamError::Custom { + msg: format!("hot tier not initialised for stream {}", stream_name), + status: (StatusCode::BAD_REQUEST), + }) + } +} + +pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } + + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::HotTierNotEnabled(stream_name)); + } + + if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) { + return Err(StreamError::Custom { + msg: "Hot tier can not be deleted for internal stream".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + hot_tier_manager.delete_hot_tier(&stream_name).await?; + } + Ok(( + format!("hot tier deleted for stream {stream_name}"), + StatusCode::OK, + )) +} diff --git a/server/src/handlers/http/modal/query/querier_rbac.rs b/server/src/handlers/http/modal/query/querier_rbac.rs new file mode 100644 index 000000000..a5b88c33b --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_rbac.rs @@ -0,0 +1,157 @@ +use std::collections::HashSet; + +use actix_web::{web, Responder}; +use tokio::sync::Mutex; + +use crate::{ + handlers::http::{ + cluster::{ + sync_password_reset_with_ingestors, sync_user_creation_with_ingestors, + sync_user_deletion_with_ingestors, sync_users_with_roles_with_ingestors, + }, + modal::utils::rbac_utils::{get_metadata, put_metadata}, + rbac::RBACError, + }, + rbac::{user, Users}, + validator, +}; + +// async aware lock for updating storage metadata and user map atomicically +static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); + +// Handler for POST /api/v1/user/{username} +// Creates a new user by username if it does not exists +pub async fn post_user( + username: web::Path, + body: Option>, +) -> Result { + let username = username.into_inner(); + + let mut metadata = get_metadata().await?; + + validator::user_name(&username)?; + let roles: HashSet = if let Some(body) = body { + serde_json::from_value(body.into_inner())? + } else { + return Err(RBACError::RoleValidationError); + }; + + if roles.is_empty() { + return Err(RBACError::RoleValidationError); + } + let _ = UPDATE_LOCK.lock().await; + if Users.contains(&username) + || metadata + .users + .iter() + .any(|user| user.username() == username) + { + return Err(RBACError::UserExists); + } + + let (user, password) = user::User::new_basic(username.clone()); + + metadata.users.push(user.clone()); + + put_metadata(&metadata).await?; + let created_role = roles.clone(); + Users.put_user(user.clone()); + + sync_user_creation_with_ingestors(user, &Some(roles)).await?; + + put_role( + web::Path::::from(username.clone()), + web::Json(created_role), + ) + .await?; + + Ok(password) +} + +// Handler for DELETE /api/v1/user/delete/{username} +pub async fn delete_user(username: web::Path) -> Result { + let username = username.into_inner(); + let _ = UPDATE_LOCK.lock().await; + // fail this request if the user does not exists + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // delete from parseable.json first + let mut metadata = get_metadata().await?; + metadata.users.retain(|user| user.username() != username); + + put_metadata(&metadata).await?; + + sync_user_deletion_with_ingestors(&username).await?; + + // update in mem table + Users.delete_user(&username); + Ok(format!("deleted user: {username}")) +} + +// Handler PUT /user/{username}/roles => Put roles for user +// Put roles for given user +pub async fn put_role( + username: web::Path, + role: web::Json>, +) -> Result { + let username = username.into_inner(); + let role = role.into_inner(); + + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.clone_from(&role); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + put_metadata(&metadata).await?; + // update in mem table + Users.put_role(&username.clone(), role.clone()); + + sync_users_with_roles_with_ingestors(&username, &role).await?; + + Ok(format!("Roles updated successfully for {username}")) +} + +// Handler for POST /api/v1/user/{username}/generate-new-password +// Resets password for the user to a newly generated one and returns it +pub async fn post_gen_password(username: web::Path) -> Result { + let username = username.into_inner(); + let mut new_password = String::default(); + let mut new_hash = String::default(); + let mut metadata = get_metadata().await?; + + let _ = UPDATE_LOCK.lock().await; + let user::PassCode { password, hash } = user::Basic::gen_new_password(); + new_password.clone_from(&password); + new_hash.clone_from(&hash); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + user.password_hash.clone_from(&hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + put_metadata(&metadata).await?; + Users.change_password_hash(&username, &new_hash); + + sync_password_reset_with_ingestors(&username).await?; + + Ok(new_password) +} diff --git a/server/src/handlers/http/modal/query/querier_role.rs b/server/src/handlers/http/modal/query/querier_role.rs new file mode 100644 index 000000000..c17489273 --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_role.rs @@ -0,0 +1,27 @@ +use actix_web::{web, HttpResponse, Responder}; +use bytes::Bytes; + +use crate::{ + handlers::http::{ + cluster::sync_role_update_with_ingestors, + modal::utils::rbac_utils::{get_metadata, put_metadata}, + role::RoleError, + }, + rbac::{map::mut_roles, role::model::DefaultPrivilege}, +}; + +// Handler for PUT /api/v1/role/{name} +// Creates a new role or update existing one +pub async fn put(name: web::Path, body: Bytes) -> Result { + let name = name.into_inner(); + let privileges = serde_json::from_slice::>(&body)?; + let mut metadata = get_metadata().await?; + metadata.roles.insert(name.clone(), privileges.clone()); + + put_metadata(&metadata).await?; + mut_roles().insert(name.clone(), privileges.clone()); + + sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; + + Ok(HttpResponse::Ok().finish()) +} diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 28c39a63e..618e9e070 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -18,18 +18,19 @@ use crate::handlers::airplane; use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; -use crate::handlers::http::health_check; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; -use crate::handlers::http::middleware::RouteExt; +use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; +use crate::handlers::http::{self, role}; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; +use crate::handlers::http::{health_check, logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; use crate::rbac::role::Action; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use crate::{analytics, banner, metrics, migration, rbac, storage}; -use actix_web::web; -use actix_web::web::ServiceConfig; +use actix_web::web::{resource, ServiceConfig}; +use actix_web::{web, Scope}; use actix_web::{App, HttpServer}; use async_trait::async_trait; use std::sync::Arc; @@ -37,6 +38,7 @@ use tokio::sync::{oneshot, Mutex}; use crate::option::CONFIG; +use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; use super::{OpenIdClient, ParseableServer}; @@ -160,19 +162,217 @@ impl QueryServer { .service(Server::get_liveness_factory()) .service(Server::get_readiness_factory()) .service(Server::get_about_factory()) - .service(Server::get_logstream_webscope()) - .service(Server::get_user_webscope()) + .service(Self::get_logstream_webscope()) + .service(Self::get_user_webscope()) .service(Server::get_dashboards_webscope()) .service(Server::get_filters_webscope()) .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) - .service(Server::get_user_role_webscope()) + .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) .service(Self::get_cluster_web_scope()), ) .service(Server::get_generated()); } + // get the role webscope + fn get_user_role_webscope() -> Scope { + web::scope("/role") + // GET Role List + .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service( + // PUT and GET Default Role + resource("/default") + .route(web::put().to(role::put_default).authorize(Action::PutRole)) + .route(web::get().to(role::get_default).authorize(Action::GetRole)), + ) + .service( + // PUT, GET, DELETE Roles + resource("/{name}") + .route(web::put().to(querier_role::put).authorize(Action::PutRole)) + .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route(web::get().to(role::get).authorize(Action::GetRole)), + ) + } + + // get the user webscope + fn get_user_webscope() -> Scope { + web::scope("/user") + .service( + web::resource("") + // GET /user => List all users + .route( + web::get() + .to(http::rbac::list_users) + .authorize(Action::ListUser), + ), + ) + .service( + web::resource("/{username}") + // PUT /user/{username} => Create a new user + .route( + web::post() + .to(querier_rbac::post_user) + .authorize(Action::PutUser), + ) + // DELETE /user/{username} => Delete a user + .route( + web::delete() + .to(querier_rbac::delete_user) + .authorize(Action::DeleteUser), + ) + .wrap(DisAllowRootUser), + ) + .service( + web::resource("/{username}/role") + // PUT /user/{username}/roles => Put roles for user + .route( + web::put() + .to(querier_rbac::put_role) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ) + .route( + web::get() + .to(http::rbac::get_role) + .authorize_for_user(Action::GetUserRoles), + ), + ) + .service( + web::resource("/{username}/generate-new-password") + // POST /user/{username}/generate-new-password => reset password for this user + .route( + web::post() + .to(querier_rbac::post_gen_password) + .authorize(Action::PutUser) + .wrap(DisAllowRootUser), + ), + ) + } + + // get the logstream web scope + fn get_logstream_webscope() -> Scope { + web::scope("/logstream") + .service( + // GET "/logstream" ==> Get list of all Log Streams on the server + web::resource("") + .route(web::get().to(logstream::list).authorize(Action::ListStream)), + ) + .service( + web::scope("/{logstream}") + .service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(querier_logstream::put_stream) + .authorize_for_stream(Action::CreateStream), + ) + // POST "/logstream/{logstream}" ==> Post logs to given log stream + .route( + web::post() + .to(querier_ingest::post_event) + .authorize_for_stream(Action::Ingest), + ) + // DELETE "/logstream/{logstream}" ==> Delete log stream + .route( + web::delete() + .to(querier_logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + // GET "/logstream/{logstream}/info" ==> Get info for given log stream + web::resource("/info").route( + web::get() + .to(logstream::get_stream_info) + .authorize_for_stream(Action::GetStreamInfo), + ), + ) + .service( + web::resource("/alert") + // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream + .route( + web::put() + .to(logstream::put_alert) + .authorize_for_stream(Action::PutAlert), + ) + // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream + .route( + web::get() + .to(logstream::get_alert) + .authorize_for_stream(Action::GetAlert), + ), + ) + .service( + // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream + web::resource("/schema").route( + web::get() + .to(logstream::schema) + .authorize_for_stream(Action::GetSchema), + ), + ) + .service( + // GET "/logstream/{logstream}/stats" ==> Get stats for given log stream + web::resource("/stats").route( + web::get() + .to(querier_logstream::get_stats) + .authorize_for_stream(Action::GetStats), + ), + ) + .service( + web::resource("/retention") + // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream + .route( + web::put() + .to(logstream::put_retention) + .authorize_for_stream(Action::PutRetention), + ) + // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream + .route( + web::get() + .to(logstream::get_retention) + .authorize_for_stream(Action::GetRetention), + ), + ) + .service( + web::resource("/cache") + // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream + .route( + web::put() + .to(querier_logstream::put_enable_cache) + .authorize_for_stream(Action::PutCacheEnabled), + ) + // GET "/logstream/{logstream}/cache" ==> Get retention for given logstream + .route( + web::get() + .to(querier_logstream::get_cache_enabled) + .authorize_for_stream(Action::GetCacheEnabled), + ), + ) + .service( + web::resource("/hottier") + // PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream + .route( + web::put() + .to(querier_logstream::put_stream_hot_tier) + .authorize_for_stream(Action::PutHotTierEnabled), + ) + .route( + web::get() + .to(querier_logstream::get_stream_hot_tier) + .authorize_for_stream(Action::GetHotTierEnabled), + ) + .route( + web::delete() + .to(querier_logstream::delete_stream_hot_tier) + .authorize_for_stream(Action::DeleteHotTierEnabled), + ), + ), + ) + } + fn get_cluster_web_scope() -> actix_web::Scope { web::scope("/cluster") .service( diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index d3d56eb90..cdbe544e7 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -383,25 +383,6 @@ impl Server { .to(logstream::get_cache_enabled) .authorize_for_stream(Action::GetCacheEnabled), ), - ) - .service( - web::resource("/hottier") - // PUT "/logstream/{logstream}/hottier" ==> Set hottier for given logstream - .route( - web::put() - .to(logstream::put_stream_hot_tier) - .authorize_for_stream(Action::PutHotTierEnabled), - ) - .route( - web::get() - .to(logstream::get_stream_hot_tier) - .authorize_for_stream(Action::GetHotTierEnabled), - ) - .route( - web::delete() - .to(logstream::delete_stream_hot_tier) - .authorize_for_stream(Action::DeleteHotTierEnabled), - ), ), ) } @@ -463,7 +444,7 @@ impl Server { } // get the user webscope - pub fn get_user_webscope() -> Scope { + fn get_user_webscope() -> Scope { web::scope("/user") .service( web::resource("") diff --git a/server/src/handlers/http/modal/utils/ingest_utils.rs b/server/src/handlers/http/modal/utils/ingest_utils.rs new file mode 100644 index 000000000..9d29d0a76 --- /dev/null +++ b/server/src/handlers/http/modal/utils/ingest_utils.rs @@ -0,0 +1,260 @@ +use std::{ + collections::{BTreeMap, HashMap}, + sync::Arc, +}; + +use actix_web::HttpRequest; +use arrow_schema::Field; +use bytes::Bytes; +use chrono::{DateTime, NaiveDateTime, Utc}; +use serde_json::Value; + +use crate::{ + event::{ + self, + format::{self, EventFormat}, + }, + handlers::{ + http::{ingest::PostError, kinesis, otel}, + LOG_SOURCE_KEY, LOG_SOURCE_KINESIS, LOG_SOURCE_OTEL, PREFIX_META, PREFIX_TAGS, SEPARATOR, + }, + metadata::STREAM_INFO, + storage::StreamType, + utils::{header_parsing::collect_labelled_headers, json::convert_array_to_object}, +}; + +pub async fn flatten_and_push_logs( + req: HttpRequest, + body: Bytes, + stream_name: String, +) -> Result<(), PostError> { + //flatten logs + if let Some((_, log_source)) = req.headers().iter().find(|&(key, _)| key == LOG_SOURCE_KEY) { + let mut json: Vec> = Vec::new(); + let log_source: String = log_source.to_str().unwrap().to_owned(); + match log_source.as_str() { + LOG_SOURCE_KINESIS => json = kinesis::flatten_kinesis_logs(&body), + LOG_SOURCE_OTEL => { + json = otel::flatten_otel_logs(&body); + } + _ => { + log::warn!("Unknown log source: {}", log_source); + push_logs(stream_name.to_string(), req.clone(), body).await?; + } + } + for record in json.iter_mut() { + let body: Bytes = serde_json::to_vec(record).unwrap().into(); + push_logs(stream_name.to_string(), req.clone(), body).await?; + } + } else { + push_logs(stream_name.to_string(), req, body).await?; + } + Ok(()) +} + +pub async fn push_logs( + stream_name: String, + req: HttpRequest, + body: Bytes, +) -> Result<(), PostError> { + let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; + let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?; + let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?; + let body_val: Value = serde_json::from_slice(&body)?; + let size: usize = body.len(); + let mut parsed_timestamp = Utc::now().naive_utc(); + if time_partition.is_none() { + if custom_partition.is_none() { + let size = size as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + body_val.clone(), + static_schema_flag.clone(), + None, + parsed_timestamp, + HashMap::new(), + size, + ) + .await?; + } else { + let data = + convert_array_to_object(body_val.clone(), None, None, custom_partition.clone())?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + + for value in data { + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); + + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + None, + parsed_timestamp, + custom_partition_values.clone(), + size, + ) + .await?; + } + } + } else if custom_partition.is_none() { + let data = convert_array_to_object( + body_val.clone(), + time_partition.clone(), + time_partition_limit, + None, + )?; + for value in data { + parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + parsed_timestamp, + HashMap::new(), + size, + ) + .await?; + } + } else { + let data = convert_array_to_object( + body_val.clone(), + time_partition.clone(), + time_partition_limit, + custom_partition.clone(), + )?; + let custom_partition = custom_partition.unwrap(); + let custom_partition_list = custom_partition.split(',').collect::>(); + + for value in data { + let custom_partition_values = + get_custom_partition_values(&value, &custom_partition_list); + + parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + let size = value.to_string().into_bytes().len() as u64; + create_process_record_batch( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + parsed_timestamp, + custom_partition_values.clone(), + size, + ) + .await?; + } + } + + Ok(()) +} + +#[allow(clippy::too_many_arguments)] +pub async fn create_process_record_batch( + stream_name: String, + req: HttpRequest, + value: Value, + static_schema_flag: Option, + time_partition: Option, + parsed_timestamp: NaiveDateTime, + custom_partition_values: HashMap, + origin_size: u64, +) -> Result<(), PostError> { + let (rb, is_first_event) = get_stream_schema( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values: custom_partition_values.clone(), + stream_type: StreamType::UserDefined, + } + .process() + .await?; + + Ok(()) +} + +pub fn get_stream_schema( + stream_name: String, + req: HttpRequest, + body: Value, + static_schema_flag: Option, + time_partition: Option, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name))? + .schema + .clone(); + into_event_batch(req, body, schema, static_schema_flag, time_partition) +} + +pub fn into_event_batch( + req: HttpRequest, + body: Value, + schema: HashMap>, + static_schema_flag: Option, + time_partition: Option, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; + let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; + let event = format::json::Event { + data: body, + tags, + metadata, + }; + let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)?; + Ok((rb, is_first)) +} + +pub fn get_custom_partition_values( + body: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value.clone() { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +pub fn get_parsed_timestamp(body: &Value, time_partition: &Option) -> NaiveDateTime { + let body_timestamp = body.get(time_partition.clone().unwrap().to_string()); + let parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + parsed_timestamp +} diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs new file mode 100644 index 000000000..65628eca5 --- /dev/null +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -0,0 +1,380 @@ +use std::{collections::HashMap, num::NonZeroU32, sync::Arc}; + +use actix_web::{http::header::HeaderMap, HttpRequest}; +use arrow_schema::{Field, Schema}; +use bytes::Bytes; +use http::StatusCode; + +use crate::{ + handlers::{ + http::logstream::error::{CreateStreamError, StreamError}, + CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, + TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, + }, + metadata::{self, STREAM_INFO}, + option::CONFIG, + static_schema::{convert_static_schema_to_arrow_schema, StaticSchema}, + storage::StreamType, + validator, +}; + +pub async fn create_update_stream( + req: &HttpRequest, + body: &Bytes, + stream_name: &str, +) -> Result { + let ( + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + update_stream, + stream_type, + ) = fetch_headers_from_put_stream_request(req); + + if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { + return Err(StreamError::Custom { + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); + } + + if update_stream == "true" { + if !STREAM_INFO.stream_exists(stream_name) { + return Err(StreamError::StreamNotFound(stream_name.to_string())); + } + if !time_partition.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the time partition of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if !static_schema_flag.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the schema of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; + update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) + .await?; + return Ok(req.headers().clone()); + } + + if !custom_partition.is_empty() { + validate_custom_partition(&custom_partition)?; + update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; + } else { + update_custom_partition_in_stream(stream_name.to_string(), "").await?; + } + return Ok(req.headers().clone()); + } + let mut time_partition_in_days = ""; + if !time_partition_limit.is_empty() { + time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?; + } + if !custom_partition.is_empty() { + validate_custom_partition(&custom_partition)?; + } + + if !time_partition.is_empty() && !custom_partition.is_empty() { + validate_time_with_custom_partition(&time_partition, &custom_partition)?; + } + + let schema = validate_static_schema( + body, + stream_name, + &time_partition, + &custom_partition, + &static_schema_flag, + )?; + + create_stream( + stream_name.to_string(), + &time_partition, + time_partition_in_days, + &custom_partition, + &static_schema_flag, + schema, + &stream_type, + ) + .await?; + + Ok(req.headers().clone()) +} + +pub fn fetch_headers_from_put_stream_request( + req: &HttpRequest, +) -> (String, String, String, String, String, String) { + let mut time_partition = String::default(); + let mut time_partition_limit = String::default(); + let mut custom_partition = String::default(); + let mut static_schema_flag = String::default(); + let mut update_stream = String::default(); + let mut stream_type = StreamType::UserDefined.to_string(); + req.headers().iter().for_each(|(key, value)| { + if key == TIME_PARTITION_KEY { + time_partition = value.to_str().unwrap().to_string(); + } + if key == TIME_PARTITION_LIMIT_KEY { + time_partition_limit = value.to_str().unwrap().to_string(); + } + if key == CUSTOM_PARTITION_KEY { + custom_partition = value.to_str().unwrap().to_string(); + } + if key == STATIC_SCHEMA_FLAG { + static_schema_flag = value.to_str().unwrap().to_string(); + } + if key == UPDATE_STREAM_KEY { + update_stream = value.to_str().unwrap().to_string(); + } + if key == STREAM_TYPE_KEY { + stream_type = value.to_str().unwrap().to_string(); + } + }); + + ( + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + update_stream, + stream_type, + ) +} + +pub fn validate_time_partition_limit( + time_partition_limit: &str, +) -> Result<&str, CreateStreamError> { + if !time_partition_limit.ends_with('d') { + return Err(CreateStreamError::Custom { + msg: "Missing 'd' suffix for duration value".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let days = &time_partition_limit[0..time_partition_limit.len() - 1]; + if days.parse::().is_err() { + return Err(CreateStreamError::Custom { + msg: "Could not convert duration to an unsigned number".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + Ok(days) +} + +pub fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> { + let custom_partition_list = custom_partition.split(',').collect::>(); + if custom_partition_list.len() > 3 { + return Err(CreateStreamError::Custom { + msg: "Maximum 3 custom partition keys are supported".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + Ok(()) +} + +pub fn validate_time_with_custom_partition( + time_partition: &str, + custom_partition: &str, +) -> Result<(), CreateStreamError> { + let custom_partition_list = custom_partition.split(',').collect::>(); + if custom_partition_list.contains(&time_partition) { + return Err(CreateStreamError::Custom { + msg: format!( + "time partition {} cannot be set as custom partition", + time_partition + ), + status: StatusCode::BAD_REQUEST, + }); + } + Ok(()) +} + +pub fn validate_static_schema( + body: &Bytes, + stream_name: &str, + time_partition: &str, + custom_partition: &str, + static_schema_flag: &str, +) -> Result, CreateStreamError> { + if static_schema_flag == "true" { + if body.is_empty() { + return Err(CreateStreamError::Custom { + msg: format!( + "Please provide schema in the request body for static schema logstream {stream_name}" + ), + status: StatusCode::BAD_REQUEST, + }); + } + + let static_schema: StaticSchema = serde_json::from_slice(body)?; + let parsed_schema = + convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition) + .map_err(|_| CreateStreamError::Custom { + msg: format!( + "Unable to commit static schema, logstream {stream_name} not created" + ), + status: StatusCode::BAD_REQUEST, + })?; + + return Ok(parsed_schema); + } + + Ok(Arc::new(Schema::empty())) +} + +pub async fn update_time_partition_limit_in_stream( + stream_name: String, + time_partition_limit: &str, +) -> Result<(), CreateStreamError> { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_time_partition_limit_in_stream(&stream_name, time_partition_limit) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + + if metadata::STREAM_INFO + .update_time_partition_limit(&stream_name, time_partition_limit.to_string()) + .is_err() + { + return Err(CreateStreamError::Custom { + msg: "failed to update time partition limit in metadata".to_string(), + status: StatusCode::EXPECTATION_FAILED, + }); + } + + Ok(()) +} + +pub async fn update_custom_partition_in_stream( + stream_name: String, + custom_partition: &str, +) -> Result<(), CreateStreamError> { + let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap(); + let time_partition = STREAM_INFO.get_time_partition(&stream_name).unwrap(); + if static_schema_flag.is_some() { + let schema = STREAM_INFO.schema(&stream_name).unwrap(); + + if !custom_partition.is_empty() { + let custom_partition_list = custom_partition.split(',').collect::>(); + let custom_partition_exists: HashMap<_, _> = custom_partition_list + .iter() + .map(|&partition| { + ( + partition.to_string(), + schema + .fields() + .iter() + .any(|field| field.name() == partition), + ) + }) + .collect(); + + for partition in &custom_partition_list { + if !custom_partition_exists[*partition] { + return Err(CreateStreamError::Custom { + msg: format!("custom partition field {} does not exist in the schema for the stream {}", partition, stream_name), + status: StatusCode::BAD_REQUEST, + }); + } + + if let Some(time_partition) = time_partition.clone() { + if time_partition == *partition { + return Err(CreateStreamError::Custom { + msg: format!( + "time partition {} cannot be set as custom partition", + partition + ), + status: StatusCode::BAD_REQUEST, + }); + } + } + } + } + } + + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_custom_partition_in_stream(&stream_name, custom_partition) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + + if metadata::STREAM_INFO + .update_custom_partition(&stream_name, custom_partition.to_string()) + .is_err() + { + return Err(CreateStreamError::Custom { + msg: "failed to update custom partition in metadata".to_string(), + status: StatusCode::EXPECTATION_FAILED, + }); + } + + Ok(()) +} + +pub async fn create_stream( + stream_name: String, + time_partition: &str, + time_partition_limit: &str, + custom_partition: &str, + static_schema_flag: &str, + schema: Arc, + stream_type: &str, +) -> Result<(), CreateStreamError> { + // fail to proceed if invalid stream name + if stream_type != StreamType::Internal.to_string() { + validator::stream_name(&stream_name, stream_type)?; + } + // Proceed to create log stream if it doesn't exist + let storage = CONFIG.storage().get_object_store(); + + match storage + .create_stream( + &stream_name, + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + schema.clone(), + stream_type, + ) + .await + { + Ok(created_at) => { + let mut static_schema: HashMap> = HashMap::new(); + + for (field_name, field) in schema + .fields() + .iter() + .map(|field| (field.name().to_string(), field.clone())) + { + static_schema.insert(field_name, field); + } + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + time_partition_limit.to_string(), + custom_partition.to_string(), + static_schema_flag.to_string(), + static_schema, + stream_type, + ); + } + Err(err) => { + return Err(CreateStreamError::Storage { stream_name, err }); + } + } + Ok(()) +} diff --git a/server/src/handlers/http/modal/utils/mod.rs b/server/src/handlers/http/modal/utils/mod.rs new file mode 100644 index 000000000..7ec7e1cbd --- /dev/null +++ b/server/src/handlers/http/modal/utils/mod.rs @@ -0,0 +1,3 @@ +pub mod ingest_utils; +pub mod logstream_utils; +pub mod rbac_utils; diff --git a/server/src/handlers/http/modal/utils/rbac_utils.rs b/server/src/handlers/http/modal/utils/rbac_utils.rs new file mode 100644 index 000000000..195a69a69 --- /dev/null +++ b/server/src/handlers/http/modal/utils/rbac_utils.rs @@ -0,0 +1,20 @@ +use crate::{ + option::CONFIG, + storage::{self, ObjectStorageError, StorageMetadata}, +}; + +pub async fn get_metadata() -> Result { + let metadata = CONFIG + .storage() + .get_object_store() + .get_metadata() + .await? + .expect("metadata is initialized"); + Ok(metadata) +} + +pub async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { + storage::put_remote_metadata(metadata).await?; + storage::put_staging_metadata(metadata)?; + Ok(()) +} diff --git a/server/src/handlers/http/rbac.rs b/server/src/handlers/http/rbac.rs index 3c73b42c1..38fc8f84e 100644 --- a/server/src/handlers/http/rbac.rs +++ b/server/src/handlers/http/rbac.rs @@ -19,25 +19,15 @@ use std::collections::{HashMap, HashSet}; use crate::{ - handlers::http::cluster::sync_users_with_roles_with_ingestors, - option::{Mode, CONFIG}, - rbac::{ - map::roles, - role::model::DefaultPrivilege, - user::{self, User as ParseableUser}, - Users, - }, - storage::{self, ObjectStorageError, StorageMetadata}, + rbac::{map::roles, role::model::DefaultPrivilege, user, Users}, + storage::ObjectStorageError, validator::{self, error::UsernameValidationError}, }; use actix_web::{http::header::ContentType, web, Responder}; use http::StatusCode; use tokio::sync::Mutex; -use super::cluster::{ - sync_password_reset_with_ingestors, sync_user_creation_with_ingestors, - sync_user_deletion_with_ingestors, -}; +use super::modal::utils::rbac_utils::{get_metadata, put_metadata}; // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); @@ -76,57 +66,43 @@ pub async fn post_user( ) -> Result { let username = username.into_inner(); - let mut generated_password = String::default(); let mut metadata = get_metadata().await?; - if CONFIG.parseable.mode == Mode::Ingest { - if let Some(body) = body { - let user: ParseableUser = serde_json::from_value(body.into_inner())?; - let _ = storage::put_staging_metadata(&metadata); - let created_role = user.roles.clone(); - Users.put_user(user.clone()); - Users.put_role(&username, created_role.clone()); - } + + validator::user_name(&username)?; + let roles: HashSet = if let Some(body) = body { + serde_json::from_value(body.into_inner())? } else { - validator::user_name(&username)?; - let roles: HashSet = if let Some(body) = body { - serde_json::from_value(body.into_inner())? - } else { - return Err(RBACError::RoleValidationError); - }; + return Err(RBACError::RoleValidationError); + }; - if roles.is_empty() { - return Err(RBACError::RoleValidationError); - } - let _ = UPDATE_LOCK.lock().await; - if Users.contains(&username) - || metadata - .users - .iter() - .any(|user| user.username() == username) - { - return Err(RBACError::UserExists); - } + if roles.is_empty() { + return Err(RBACError::RoleValidationError); + } + let _ = UPDATE_LOCK.lock().await; + if Users.contains(&username) + || metadata + .users + .iter() + .any(|user| user.username() == username) + { + return Err(RBACError::UserExists); + } - let (user, password) = user::User::new_basic(username.clone()); + let (user, password) = user::User::new_basic(username.clone()); - generated_password = password; - metadata.users.push(user.clone()); + metadata.users.push(user.clone()); - put_metadata(&metadata).await?; - let created_role = roles.clone(); - Users.put_user(user.clone()); + put_metadata(&metadata).await?; + let created_role = roles.clone(); + Users.put_user(user.clone()); - if CONFIG.parseable.mode == Mode::Query { - sync_user_creation_with_ingestors(user, &Some(roles)).await?; - } - put_role( - web::Path::::from(username.clone()), - web::Json(created_role), - ) - .await?; - } + put_role( + web::Path::::from(username.clone()), + web::Json(created_role), + ) + .await?; - Ok(generated_password) + Ok(password) } // Handler for POST /api/v1/user/{username}/generate-new-password @@ -136,46 +112,26 @@ pub async fn post_gen_password(username: web::Path) -> Result Some(user), - _ => None, - }) - .find(|user| user.username == username) - { - new_hash.clone_from(&user.password_hash); - } else { - return Err(RBACError::UserDoesNotExist); - } - Users.change_password_hash(&username, &new_hash); + + let _ = UPDATE_LOCK.lock().await; + let user::PassCode { password, hash } = user::Basic::gen_new_password(); + new_password.clone_from(&password); + new_hash.clone_from(&hash); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + user.password_hash.clone_from(&hash); } else { - let _ = UPDATE_LOCK.lock().await; - let user::PassCode { password, hash } = user::Basic::gen_new_password(); - new_password.clone_from(&password); - new_hash.clone_from(&hash); - if let Some(user) = metadata - .users - .iter_mut() - .filter_map(|user| match user.ty { - user::UserType::Native(ref mut user) => Some(user), - _ => None, - }) - .find(|user| user.username == username) - { - user.password_hash.clone_from(&hash); - } else { - return Err(RBACError::UserDoesNotExist); - } - put_metadata(&metadata).await?; - Users.change_password_hash(&username, &new_hash); - if CONFIG.parseable.mode == Mode::Query { - sync_password_reset_with_ingestors(&username).await?; - } + return Err(RBACError::UserDoesNotExist); } + put_metadata(&metadata).await?; + Users.change_password_hash(&username, &new_hash); Ok(new_password) } @@ -211,14 +167,7 @@ pub async fn delete_user(username: web::Path) -> Result Result { - let metadata = CONFIG - .storage() - .get_object_store() - .get_metadata() - .await? - .expect("metadata is initialized"); - Ok(metadata) -} - -async fn put_metadata(metadata: &StorageMetadata) -> Result<(), ObjectStorageError> { - storage::put_remote_metadata(metadata).await?; - storage::put_staging_metadata(metadata)?; - Ok(()) -} - #[derive(Debug, thiserror::Error)] pub enum RBACError { #[error("User exists already")] diff --git a/server/src/handlers/http/role.rs b/server/src/handlers/http/role.rs index 22f61ba05..757f4f170 100644 --- a/server/src/handlers/http/role.rs +++ b/server/src/handlers/http/role.rs @@ -21,7 +21,7 @@ use bytes::Bytes; use http::StatusCode; use crate::{ - option::{Mode, CONFIG}, + option::CONFIG, rbac::{ map::{mut_roles, DEFAULT_ROLE}, role::model::DefaultPrivilege, @@ -29,8 +29,6 @@ use crate::{ storage::{self, ObjectStorageError, StorageMetadata}, }; -use super::cluster::sync_role_update_with_ingestors; - // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one pub async fn put(name: web::Path, body: Bytes) -> Result { @@ -38,16 +36,9 @@ pub async fn put(name: web::Path, body: Bytes) -> Result>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); - if CONFIG.parseable.mode == Mode::Ingest { - let _ = storage::put_staging_metadata(&metadata); - mut_roles().insert(name.clone(), privileges.clone()); - } else { - put_metadata(&metadata).await?; - mut_roles().insert(name.clone(), privileges.clone()); - if CONFIG.parseable.mode == Mode::Query { - sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; - } - } + + put_metadata(&metadata).await?; + mut_roles().insert(name.clone(), privileges.clone()); Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 3a244f355..59c4d6651 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -238,6 +238,7 @@ pub fn convert_disk_files_to_parquet( .set(0); } + // log::warn!("staging files-\n{staging_files:?}\n"); for (parquet_path, files) in staging_files { metrics::STAGING_FILES .with_label_values(&[stream]) @@ -289,6 +290,7 @@ pub fn convert_disk_files_to_parquet( fs::remove_file(parquet_path).unwrap(); } else { for file in files { + // log::warn!("file-\n{file:?}\n"); let file_size = file.metadata().unwrap().len(); let file_type = file.extension().unwrap().to_str().unwrap(); if fs::remove_file(file.clone()).is_err() {