From 7720fb19ce648add8677afefa78b5c67a1dc6959 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 20 Dec 2024 22:15:14 +0530 Subject: [PATCH] refactor: clean up parts of the codebase (#981) --- src/analytics.rs | 4 +- src/cli.rs | 20 +- src/event/format/json.rs | 12 +- src/event/format/mod.rs | 31 +- src/handlers/http/ingest.rs | 34 +- src/handlers/http/kinesis.rs | 4 +- src/handlers/http/logstream.rs | 8 +- src/handlers/http/middleware.rs | 2 +- src/handlers/http/modal/ingest_server.rs | 9 +- src/handlers/http/modal/mod.rs | 6 +- src/handlers/http/modal/utils/ingest_utils.rs | 146 +++-- src/hottier.rs | 228 ++++---- src/migration/metadata_migration.rs | 33 +- src/migration/mod.rs | 15 +- src/option.rs | 61 +- src/query/stream_schema_provider.rs | 521 +++++++++--------- src/storage/azure_blob.rs | 2 +- src/storage/localfs.rs | 2 +- src/storage/object_storage.rs | 8 +- src/storage/s3.rs | 51 +- src/storage/staging.rs | 2 +- src/storage/store_metadata.rs | 25 +- src/utils/json/flatten.rs | 29 +- src/utils/json/mod.rs | 37 +- 24 files changed, 587 insertions(+), 703 deletions(-) diff --git a/src/analytics.rs b/src/analytics.rs index fad0dca74..3c4d05c3c 100644 --- a/src/analytics.rs +++ b/src/analytics.rs @@ -64,7 +64,7 @@ pub struct Report { memory_total_bytes: u64, platform: String, storage_mode: String, - server_mode: String, + server_mode: Mode, version: String, commit_hash: String, active_ingestors: u64, @@ -112,7 +112,7 @@ impl Report { memory_total_bytes: mem_total, platform: platform().to_string(), storage_mode: CONFIG.get_storage_mode_string().to_string(), - server_mode: CONFIG.parseable.mode.to_string(), + server_mode: CONFIG.parseable.mode, version: current().released_version.to_string(), commit_hash: current().commit_hash, active_ingestors: ingestor_metrics.0, diff --git a/src/cli.rs b/src/cli.rs index 193da9632..3dd0db859 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -527,20 +527,12 @@ impl FromArgMatches for Cli { .get_one::(Self::ROW_GROUP_SIZE) .cloned() .expect("default for row_group size"); - self.parquet_compression = match m - .get_one::(Self::PARQUET_COMPRESSION_ALGO) - .expect("default for compression algo") - .as_str() - { - "uncompressed" => Compression::UNCOMPRESSED, - "snappy" => Compression::SNAPPY, - "gzip" => Compression::GZIP, - "lzo" => Compression::LZO, - "brotli" => Compression::BROTLI, - "lz4" => Compression::LZ4, - "zstd" => Compression::ZSTD, - _ => unreachable!(), - }; + self.parquet_compression = serde_json::from_str(&format!( + "{:?}", + m.get_one::(Self::PARQUET_COMPRESSION_ALGO) + .expect("default for compression algo") + )) + .expect("unexpected compression algo"); let openid_client_id = m.get_one::(Self::OPENID_CLIENT_ID).cloned(); let openid_client_secret = m.get_one::(Self::OPENID_CLIENT_SECRET).cloned(); diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 487cb58a6..7e910805e 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -45,11 +45,11 @@ impl EventFormat for Event { // also extract the arrow schema, tags and metadata from the incoming json fn to_data( self, - schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + schema: &HashMap>, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, None, None, None, false)?; + let data = flatten_json_body(&self.data, None, None, None, false)?; let stream_schema = schema; // incoming event may be a single json or a json array @@ -66,13 +66,13 @@ impl EventFormat for Event { collect_keys(value_arr.iter()).expect("fields can be collected from array of objects"); let mut is_first = false; - let schema = match derive_arrow_schema(&stream_schema, fields) { + let schema = match derive_arrow_schema(stream_schema, fields) { Ok(schema) => schema, Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { Ok(mut infer_schema) => { let new_infer_schema = super::super::format::update_field_type_in_schema( Arc::new(infer_schema), - Some(&stream_schema), + Some(stream_schema), time_partition, Some(&value_arr), ); diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index e637eb4e6..029d218ea 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -42,22 +42,21 @@ pub trait EventFormat: Sized { fn to_data( self, - schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + schema: &HashMap>, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; + fn decode(data: Self::Data, schema: Arc) -> Result; + fn into_recordbatch( self, - storage_schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + storage_schema: &HashMap>, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first, tags, metadata) = self.to_data( - storage_schema.clone(), - static_schema_flag.clone(), - time_partition.clone(), - )?; + let (data, mut schema, is_first, tags, metadata) = + self.to_data(storage_schema, static_schema_flag, time_partition)?; if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY)); @@ -120,8 +119,8 @@ pub trait EventFormat: Sized { fn is_schema_matching( new_schema: Arc, - storage_schema: HashMap>, - static_schema_flag: Option, + storage_schema: &HashMap>, + static_schema_flag: Option<&String>, ) -> bool { if static_schema_flag.is_none() { return true; @@ -207,7 +206,7 @@ pub fn override_timestamp_fields( pub fn update_field_type_in_schema( inferred_schema: Arc, existing_schema: Option<&HashMap>>, - time_partition: Option, + time_partition: Option<&String>, log_records: Option<&Vec>, ) -> Arc { let mut updated_schema = inferred_schema.clone(); @@ -236,12 +235,12 @@ pub fn update_field_type_in_schema( if time_partition.is_none() { return updated_schema; } - let time_partition_field_name = time_partition.unwrap(); + let new_schema: Vec = updated_schema .fields() .iter() .map(|field| { - if *field.name() == time_partition_field_name { + if field.name() == time_partition.unwrap() { if field.data_type() == &DataType::Utf8 { let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None); Field::new(field.name().clone(), new_data_type, true) diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 50c7b5079..2c70dca45 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -61,7 +61,7 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result< tags: String::default(), metadata: String::default(), }; - event.into_recordbatch(schema, None, None)? + event.into_recordbatch(&schema, None, None)? }; event::Event { rb, @@ -114,9 +114,9 @@ pub async fn handle_otel_ingestion( .iter() .find(|&(key, _)| key == STREAM_NAME_HEADER_KEY) { - let stream_name = stream_name.to_str().unwrap().to_owned(); - create_stream_if_not_exists(&stream_name, &StreamType::UserDefined.to_string()).await?; - push_logs(stream_name.to_string(), req.clone(), body).await?; + let stream_name = stream_name.to_str().unwrap(); + create_stream_if_not_exists(stream_name, &StreamType::UserDefined.to_string()).await?; + push_logs(stream_name, &req, &body).await?; } else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); } @@ -149,7 +149,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result, - #[serde(rename = "requestId")] request_id: String, timestamp: u64, } #[derive(Serialize, Deserialize, Debug)] struct Data { - #[serde(rename = "data")] data: String, } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 483cc21f0..56becf001 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -502,8 +502,8 @@ pub async fn put_stream_hot_tier( let existing_hot_tier_used_size = hot_tier_manager .validate_hot_tier_size(&stream_name, &hottier.size) .await?; - hottier.used_size = Some(existing_hot_tier_used_size.to_string()); - hottier.available_size = Some(hottier.size.clone()); + hottier.used_size = existing_hot_tier_used_size.to_string(); + hottier.available_size = hottier.size.to_string(); hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string()); hot_tier_manager .put_hot_tier(&stream_name, &mut hottier) @@ -546,8 +546,8 @@ pub async fn get_stream_hot_tier(req: HttpRequest) -> Result Result<(), PostError> { let log_source = req .headers() @@ -52,43 +52,42 @@ pub async fn flatten_and_push_logs( let json = kinesis::flatten_kinesis_logs(&body); for record in json.iter() { let body: Bytes = serde_json::to_vec(record).unwrap().into(); - push_logs(stream_name.clone(), req.clone(), body.clone()).await?; + push_logs(stream_name, &req, &body).await?; } } else { - push_logs(stream_name, req, body).await?; + push_logs(stream_name, &req, &body).await?; } Ok(()) } pub async fn push_logs( - stream_name: String, - req: HttpRequest, - body: Bytes, + stream_name: &str, + req: &HttpRequest, + body: &Bytes, ) -> Result<(), PostError> { - let time_partition = STREAM_INFO.get_time_partition(&stream_name)?; - let time_partition_limit = STREAM_INFO.get_time_partition_limit(&stream_name)?; - let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name)?; - let custom_partition = STREAM_INFO.get_custom_partition(&stream_name)?; - let body_val: Value = serde_json::from_slice(&body)?; + let time_partition = STREAM_INFO.get_time_partition(stream_name)?; + let time_partition_limit = STREAM_INFO.get_time_partition_limit(stream_name)?; + let static_schema_flag = STREAM_INFO.get_static_schema_flag(stream_name)?; + let custom_partition = STREAM_INFO.get_custom_partition(stream_name)?; + let body_val: Value = serde_json::from_slice(body)?; let size: usize = body.len(); let mut parsed_timestamp = Utc::now().naive_utc(); if time_partition.is_none() { if custom_partition.is_none() { let size = size as u64; create_process_record_batch( - stream_name.clone(), - req.clone(), - body_val.clone(), - static_schema_flag.clone(), + stream_name, + req, + body_val, + static_schema_flag.as_ref(), None, parsed_timestamp, - HashMap::new(), + &HashMap::new(), size, ) .await?; } else { - let data = - convert_array_to_object(body_val.clone(), None, None, custom_partition.clone())?; + let data = convert_array_to_object(&body_val, None, None, custom_partition.as_ref())?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); @@ -98,13 +97,13 @@ pub async fn push_logs( let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), + stream_name, + req, + value, + static_schema_flag.as_ref(), None, parsed_timestamp, - custom_partition_values.clone(), + &custom_partition_values, size, ) .await?; @@ -112,32 +111,32 @@ pub async fn push_logs( } } else if custom_partition.is_none() { let data = convert_array_to_object( - body_val.clone(), - time_partition.clone(), - time_partition_limit, + &body_val, + time_partition.as_ref(), + time_partition_limit.as_ref(), None, )?; for value in data { - parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref()); let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - time_partition.clone(), + stream_name, + req, + value, + static_schema_flag.as_ref(), + time_partition.as_ref(), parsed_timestamp, - HashMap::new(), + &HashMap::new(), size, ) .await?; } } else { let data = convert_array_to_object( - body_val.clone(), - time_partition.clone(), - time_partition_limit, - custom_partition.clone(), + &body_val, + time_partition.as_ref(), + time_partition_limit.as_ref(), + custom_partition.as_ref(), )?; let custom_partition = custom_partition.unwrap(); let custom_partition_list = custom_partition.split(',').collect::>(); @@ -146,16 +145,16 @@ pub async fn push_logs( let custom_partition_values = get_custom_partition_values(&value, &custom_partition_list); - parsed_timestamp = get_parsed_timestamp(&value, &time_partition); + parsed_timestamp = get_parsed_timestamp(&value, time_partition.as_ref()); let size = value.to_string().into_bytes().len() as u64; create_process_record_batch( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - time_partition.clone(), + stream_name, + req, + value, + static_schema_flag.as_ref(), + time_partition.as_ref(), parsed_timestamp, - custom_partition_values.clone(), + &custom_partition_values, size, ) .await?; @@ -167,30 +166,25 @@ pub async fn push_logs( #[allow(clippy::too_many_arguments)] pub async fn create_process_record_batch( - stream_name: String, - req: HttpRequest, + stream_name: &str, + req: &HttpRequest, value: Value, - static_schema_flag: Option, - time_partition: Option, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, parsed_timestamp: NaiveDateTime, - custom_partition_values: HashMap, + custom_partition_values: &HashMap, origin_size: u64, ) -> Result<(), PostError> { - let (rb, is_first_event) = get_stream_schema( - stream_name.clone(), - req.clone(), - value.clone(), - static_schema_flag.clone(), - time_partition.clone(), - )?; + let (rb, is_first_event) = + get_stream_schema(stream_name, req, &value, static_schema_flag, time_partition)?; event::Event { rb, - stream_name: stream_name.clone(), + stream_name: stream_name.to_owned(), origin_format: "json", origin_size, is_first_event, parsed_timestamp, - time_partition: time_partition.clone(), + time_partition: time_partition.cloned(), custom_partition_values: custom_partition_values.clone(), stream_type: StreamType::UserDefined, } @@ -201,36 +195,36 @@ pub async fn create_process_record_batch( } pub fn get_stream_schema( - stream_name: String, - req: HttpRequest, - body: Value, - static_schema_flag: Option, - time_partition: Option, + stream_name: &str, + req: &HttpRequest, + body: &Value, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { let hash_map = STREAM_INFO.read().unwrap(); let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name))? + .get(stream_name) + .ok_or(PostError::StreamNotFound(stream_name.to_owned()))? .schema .clone(); into_event_batch(req, body, schema, static_schema_flag, time_partition) } pub fn into_event_batch( - req: HttpRequest, - body: Value, + req: &HttpRequest, + body: &Value, schema: HashMap>, - static_schema_flag: Option, - time_partition: Option, + static_schema_flag: Option<&String>, + time_partition: Option<&String>, ) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; - let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; + let tags = collect_labelled_headers(req, PREFIX_TAGS, SEPARATOR)?; + let metadata = collect_labelled_headers(req, PREFIX_META, SEPARATOR)?; let event = format::json::Event { - data: body, + data: body.to_owned(), tags, metadata, }; - let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)?; + let (rb, is_first) = event.into_recordbatch(&schema, static_schema_flag, time_partition)?; Ok((rb, is_first)) } @@ -241,7 +235,7 @@ pub fn get_custom_partition_values( let mut custom_partition_values: HashMap = HashMap::new(); for custom_partition_field in custom_partition_list { let custom_partition_value = body.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value.clone() { + let custom_partition_value = match custom_partition_value { e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), Value::String(s) => s, _ => "".to_string(), @@ -254,8 +248,8 @@ pub fn get_custom_partition_values( custom_partition_values } -pub fn get_parsed_timestamp(body: &Value, time_partition: &Option) -> NaiveDateTime { - let body_timestamp = body.get(time_partition.clone().unwrap().to_string()); +pub fn get_parsed_timestamp(body: &Value, time_partition: Option<&String>) -> NaiveDateTime { + let body_timestamp = body.get(time_partition.unwrap()); let parsed_timestamp = body_timestamp .unwrap() .to_owned() diff --git a/src/hottier.rs b/src/hottier.rs index cc4e4bbb1..32a7bd4c3 100644 --- a/src/hottier.rs +++ b/src/hottier.rs @@ -41,7 +41,7 @@ use once_cell::sync::OnceCell; use parquet::errors::ParquetError; use relative_path::RelativePathBuf; use std::time::Duration; -use sysinfo::{Disks, System}; +use sysinfo::Disks; use tokio::fs::{self, DirEntry}; use tokio::io::AsyncWriteExt; use tokio_stream::wrappers::ReadDirStream; @@ -52,15 +52,13 @@ pub const MIN_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10737418240; // 10 GiB const HOT_TIER_SYNC_DURATION: Interval = clokwerk::Interval::Minutes(1); pub const INTERNAL_STREAM_HOT_TIER_SIZE_BYTES: u64 = 10485760; //10 MiB pub const CURRENT_HOT_TIER_VERSION: &str = "v2"; + #[derive(Debug, serde::Deserialize, serde::Serialize)] pub struct StreamHotTier { pub version: Option, - #[serde(rename = "size")] pub size: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub used_size: Option, - #[serde(skip_serializing_if = "Option::is_none")] - pub available_size: Option, + pub used_size: String, + pub available_size: String, #[serde(skip_serializing_if = "Option::is_none")] pub oldest_date_time_entry: Option, } @@ -99,12 +97,7 @@ impl HotTierManager { if self.check_stream_hot_tier_exists(&stream) && stream != current_stream { let stream_hot_tier = self.get_hot_tier(&stream).await?; total_hot_tier_size += &stream_hot_tier.size.parse::().unwrap(); - total_hot_tier_used_size += &stream_hot_tier - .used_size - .clone() - .unwrap() - .parse::() - .unwrap(); + total_hot_tier_used_size += stream_hot_tier.used_size.parse::().unwrap(); } } Ok((total_hot_tier_size, total_hot_tier_used_size)) @@ -124,8 +117,7 @@ impl HotTierManager { if self.check_stream_hot_tier_exists(stream) { //delete existing hot tier if its size is less than the updated hot tier size else return error let existing_hot_tier = self.get_hot_tier(stream).await?; - existing_hot_tier_used_size = - existing_hot_tier.used_size.unwrap().parse::().unwrap(); + existing_hot_tier_used_size = existing_hot_tier.used_size.parse::().unwrap(); if stream_hot_tier_size < existing_hot_tier_used_size { return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( @@ -136,28 +128,31 @@ impl HotTierManager { } } - let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage(); - - if let (Some(total_disk_space), _, Some(used_disk_space)) = - (total_disk_space, available_disk_space, used_disk_space) - { - let (total_hot_tier_size, total_hot_tier_used_size) = - self.get_hot_tiers_size(stream).await?; - let disk_threshold = - (CONFIG.parseable.max_disk_usage * total_disk_space as f64) / 100.0; - let max_allowed_hot_tier_size = disk_threshold - - total_hot_tier_size as f64 - - (used_disk_space as f64 - - total_hot_tier_used_size as f64 - - existing_hot_tier_used_size as f64); - - if stream_hot_tier_size as f64 > max_allowed_hot_tier_size { - error!("disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}", - bytes_to_human_size(disk_threshold as u64), bytes_to_human_size(used_disk_space), bytes_to_human_size(total_hot_tier_used_size), bytes_to_human_size(existing_hot_tier_used_size), bytes_to_human_size(total_hot_tier_size)); - return Err(HotTierError::ObjectStorageError(ObjectStorageError::Custom(format!( - "{} is the total usable disk space for hot tier, cannot set a bigger value.", bytes_to_human_size(max_allowed_hot_tier_size as u64) - )))); - } + let DiskUtil { + total_space, + used_space, + .. + } = get_disk_usage().expect("Codepath should only be hit if hottier is enabled"); + + let (total_hot_tier_size, total_hot_tier_used_size) = + self.get_hot_tiers_size(stream).await?; + let disk_threshold = (CONFIG.parseable.max_disk_usage * total_space as f64) / 100.0; + let max_allowed_hot_tier_size = disk_threshold + - total_hot_tier_size as f64 + - (used_space as f64 + - total_hot_tier_used_size as f64 + - existing_hot_tier_used_size as f64); + + if stream_hot_tier_size as f64 > max_allowed_hot_tier_size { + error!("disk_threshold: {}, used_disk_space: {}, total_hot_tier_used_size: {}, existing_hot_tier_used_size: {}, total_hot_tier_size: {}", + bytes_to_human_size(disk_threshold as u64), bytes_to_human_size(used_space), bytes_to_human_size(total_hot_tier_used_size), bytes_to_human_size(existing_hot_tier_used_size), bytes_to_human_size(total_hot_tier_size)); + + return Err(HotTierError::ObjectStorageError( + ObjectStorageError::Custom(format!( + "{} is the total usable disk space for hot tier, cannot set a bigger value.", + bytes_to_human_size(max_allowed_hot_tier_size as u64) + )), + )); } Ok(existing_hot_tier_used_size) @@ -261,12 +256,7 @@ impl HotTierManager { /// delete the files from the hot tier directory if the available date range is outside the hot tier range async fn process_stream(&self, stream: String) -> Result<(), HotTierError> { let stream_hot_tier = self.get_hot_tier(&stream).await?; - let mut parquet_file_size = stream_hot_tier - .used_size - .as_ref() - .unwrap() - .parse::() - .unwrap(); + let mut parquet_file_size = stream_hot_tier.used_size.parse::().unwrap(); let object_store = CONFIG.storage().get_object_store(); let mut s3_manifest_file_list = object_store.list_manifest_files(&stream).await?; @@ -358,13 +348,7 @@ impl HotTierManager { let mut file_processed = false; let mut stream_hot_tier = self.get_hot_tier(stream).await?; if !self.is_disk_available(parquet_file.file_size).await? - || stream_hot_tier - .available_size - .as_ref() - .unwrap() - .parse::() - .unwrap() - <= parquet_file.file_size + || stream_hot_tier.available_size.parse::().unwrap() <= parquet_file.file_size { if !self .cleanup_hot_tier_old_data( @@ -377,12 +361,7 @@ impl HotTierManager { { return Ok(file_processed); } - *parquet_file_size = stream_hot_tier - .used_size - .as_ref() - .unwrap() - .parse::() - .unwrap(); + *parquet_file_size = stream_hot_tier.used_size.parse::().unwrap(); } let parquet_file_path = RelativePathBuf::from(parquet_file.file_path.clone()); fs::create_dir_all(parquet_path.parent().unwrap()).await?; @@ -394,18 +373,11 @@ impl HotTierManager { .await?; file.write_all(&parquet_data).await?; *parquet_file_size += parquet_file.file_size; - stream_hot_tier.used_size = Some(parquet_file_size.to_string()); - - stream_hot_tier.available_size = Some( - (stream_hot_tier - .available_size - .as_ref() - .unwrap() - .parse::() - .unwrap() - - parquet_file.file_size) - .to_string(), - ); + stream_hot_tier.used_size = parquet_file_size.to_string(); + + stream_hot_tier.available_size = (stream_hot_tier.available_size.parse::().unwrap() + - parquet_file.file_size) + .to_string(); self.put_hot_tier(stream, &mut stream_hot_tier).await?; file_processed = true; let mut hot_tier_manifest = self @@ -495,31 +467,36 @@ impl HotTierManager { Ok(hot_tier_manifest) } - ///get the list of files from all the manifests present in hot tier directory for the stream + /// Returns the list of manifest files present in hot tier directory for the stream pub async fn get_hot_tier_manifest_files( &self, stream: &str, - manifest_files: Vec, - ) -> Result<(Vec, Vec), HotTierError> { + manifest_files: &mut Vec, + ) -> Result, HotTierError> { + // Fetch the list of hot tier parquet files for the given stream. let mut hot_tier_files = self.get_hot_tier_parquet_files(stream).await?; + + // Retain only the files in `hot_tier_files` that also exist in `manifest_files`. hot_tier_files.retain(|file| { manifest_files .iter() .any(|manifest_file| manifest_file.file_path.eq(&file.file_path)) }); + + // Sort `hot_tier_files` in descending order by file path. hot_tier_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); - let mut remaining_files: Vec = manifest_files - .into_iter() - .filter(|manifest_file| { - hot_tier_files - .iter() - .all(|file| !file.file_path.eq(&manifest_file.file_path)) - }) - .collect(); - remaining_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); - - Ok((hot_tier_files, remaining_files)) + // Update `manifest_files` to exclude files that are present in the filtered `hot_tier_files`. + manifest_files.retain(|manifest_file| { + hot_tier_files + .iter() + .all(|file| !file.file_path.eq(&manifest_file.file_path)) + }); + + // Sort `manifest_files` in descending order by file path. + manifest_files.sort_unstable_by(|a, b| b.file_path.cmp(&a.file_path)); + + Ok(hot_tier_files) } ///get the list of parquet files from the hot tier directory for the stream @@ -615,35 +592,16 @@ impl HotTierManager { fs::remove_dir_all(path_to_delete.parent().unwrap()).await?; delete_empty_directory_hot_tier(path_to_delete.parent().unwrap()).await?; - stream_hot_tier.used_size = Some( - (stream_hot_tier - .used_size - .as_ref() - .unwrap() - .parse::() - .unwrap() - - file_size) - .to_string(), - ); - stream_hot_tier.available_size = Some( - (stream_hot_tier - .available_size - .as_ref() - .unwrap() - .parse::() - .unwrap() - + file_size) - .to_string(), - ); + stream_hot_tier.used_size = + (stream_hot_tier.used_size.parse::().unwrap() - file_size) + .to_string(); + stream_hot_tier.available_size = + (stream_hot_tier.available_size.parse::().unwrap() + file_size) + .to_string(); self.put_hot_tier(stream, stream_hot_tier).await?; delete_successful = true; - if stream_hot_tier - .available_size - .as_ref() - .unwrap() - .parse::() - .unwrap() + if stream_hot_tier.available_size.parse::().unwrap() <= parquet_file_size { continue 'loop_files; @@ -663,16 +621,17 @@ impl HotTierManager { ///check if the disk is available to download the parquet file /// check if the disk usage is above the threshold pub async fn is_disk_available(&self, size_to_download: u64) -> Result { - let (total_disk_space, available_disk_space, used_disk_space) = get_disk_usage(); - - if let (Some(total_disk_space), Some(available_disk_space), Some(used_disk_space)) = - (total_disk_space, available_disk_space, used_disk_space) + if let Some(DiskUtil { + total_space, + available_space, + used_space, + }) = get_disk_usage() { - if available_disk_space < size_to_download { + if available_space < size_to_download { return Ok(false); } - if ((used_disk_space + size_to_download) as f64 * 100.0 / total_disk_space as f64) + if ((used_space + size_to_download) as f64 * 100.0 / total_space as f64) > CONFIG.parseable.max_disk_usage { return Ok(false); @@ -741,8 +700,8 @@ impl HotTierManager { let mut stream_hot_tier = StreamHotTier { version: Some(CURRENT_HOT_TIER_VERSION.to_string()), size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), - used_size: Some("0".to_string()), - available_size: Some(INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string()), + used_size: "0".to_string(), + available_size: INTERNAL_STREAM_HOT_TIER_SIZE_BYTES.to_string(), oldest_date_time_entry: None, }; self.put_hot_tier(INTERNAL_STREAM_NAME, &mut stream_hot_tier) @@ -761,30 +720,39 @@ pub fn hot_tier_file_path( object_store::path::Path::from_absolute_path(path) } -///get the disk usage for the hot tier storage path -pub fn get_disk_usage() -> (Option, Option, Option) { - let mut sys = System::new_all(); - sys.refresh_all(); - let path = CONFIG.parseable.hot_tier_storage_path.as_ref().unwrap(); +struct DiskUtil { + total_space: u64, + available_space: u64, + used_space: u64, +} +/// Get the disk usage for the hot tier storage path. If we have a three disk paritions +/// mounted as follows: +/// 1. / +/// 2. /home/parseable +/// 3. /home/example/ignore +/// +/// And parseable is running with `P_HOT_TIER_DIR` pointing to a directory in +/// `/home/parseable`, we should return the usage stats of the disk mounted there. +fn get_disk_usage() -> Option { + let path = CONFIG.parseable.hot_tier_storage_path.as_ref()?; let mut disks = Disks::new_with_refreshed_list(); + // Order the disk partitions by decreasing length of mount path disks.sort_by_key(|disk| disk.mount_point().to_str().unwrap().len()); disks.reverse(); for disk in disks.iter() { - if path.starts_with(disk.mount_point().to_str().unwrap()) { - let total_disk_space = disk.total_space(); - let available_disk_space = disk.available_space(); - let used_disk_space = total_disk_space - available_disk_space; - return ( - Some(total_disk_space), - Some(available_disk_space), - Some(used_disk_space), - ); + // Returns disk utilisation of first matching mount point + if path.starts_with(disk.mount_point()) { + return Some(DiskUtil { + total_space: disk.total_space(), + available_space: disk.available_space(), + used_space: disk.total_space() - disk.available_space(), + }); } } - (None, None, None) + None } async fn delete_empty_directory_hot_tier(path: &Path) -> io::Result<()> { diff --git a/src/migration/metadata_migration.rs b/src/migration/metadata_migration.rs index f6a194356..298bad2b0 100644 --- a/src/migration/metadata_migration.rs +++ b/src/migration/metadata_migration.rs @@ -16,15 +16,15 @@ * */ +use bytes::Bytes; use rand::distributions::DistString; -use serde_json::{Map, Value as JsonValue}; +use serde_json::{json, Map, Value as JsonValue}; use crate::{ handlers::http::modal::IngestorMetadata, option::CONFIG, storage::{object_storage::ingestor_metadata_path, staging}, }; -use actix_web::body::MessageBody; /* v1 @@ -47,10 +47,7 @@ pub fn v1_v3(mut storage_metadata: JsonValue) -> JsonValue { metadata.insert("users".to_string(), JsonValue::Array(vec![])); metadata.insert("streams".to_string(), JsonValue::Array(vec![])); metadata.insert("roles".to_string(), JsonValue::Array(vec![])); - metadata.insert( - "server_mode".to_string(), - JsonValue::String(CONFIG.parseable.mode.to_string()), - ); + metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); storage_metadata } @@ -111,10 +108,7 @@ pub fn v2_v3(mut storage_metadata: JsonValue) -> JsonValue { "roles".to_string(), JsonValue::Object(Map::from_iter(privileges_map)), ); - metadata.insert( - "server_mode".to_string(), - JsonValue::String(CONFIG.parseable.mode.to_string()), - ); + metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); storage_metadata } @@ -125,10 +119,7 @@ pub fn v3_v4(mut storage_metadata: JsonValue) -> JsonValue { let sm = metadata.get("server_mode"); if sm.is_none() || sm.unwrap().as_str().unwrap() == "All" { - metadata.insert( - "server_mode".to_string(), - JsonValue::String(CONFIG.parseable.mode.to_string()), - ); + metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); } let roles = metadata.get_mut("roles").unwrap().as_object_mut().unwrap(); @@ -155,10 +146,7 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { match metadata.get("server_mode") { None => { - metadata.insert( - "server_mode".to_string(), - JsonValue::String(CONFIG.parseable.mode.to_string()), - ); + metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); } Some(JsonValue::String(mode)) => match mode.as_str() { "Query" => { @@ -168,10 +156,7 @@ pub fn v4_v5(mut storage_metadata: JsonValue) -> JsonValue { ); } "All" => { - metadata.insert( - "server_mode".to_string(), - JsonValue::String(CONFIG.parseable.mode.to_string()), - ); + metadata.insert("server_mode".to_string(), json!(CONFIG.parseable.mode)); } _ => (), }, @@ -209,9 +194,7 @@ pub async fn migrate_ingester_metadata() -> anyhow::Result anyhow::Result<()> { stream_hot_tier.size = human_size_to_bytes(&stream_hot_tier.size) .unwrap() .to_string(); - stream_hot_tier.available_size = Some( - human_size_to_bytes(&stream_hot_tier.available_size.unwrap()) + stream_hot_tier.available_size = + human_size_to_bytes(&stream_hot_tier.available_size) .unwrap() - .to_string(), - ); - stream_hot_tier.used_size = Some( - human_size_to_bytes(&stream_hot_tier.used_size.unwrap()) - .unwrap() - .to_string(), - ); + .to_string(); + stream_hot_tier.used_size = human_size_to_bytes(&stream_hot_tier.used_size) + .unwrap() + .to_string(); hot_tier_manager .put_hot_tier(stream, &mut stream_hot_tier) .await?; diff --git a/src/option.rs b/src/option.rs index 2e112ecf9..9770972da 100644 --- a/src/option.rs +++ b/src/option.rs @@ -24,9 +24,9 @@ use crate::storage::{ use bytes::Bytes; use clap::error::ErrorKind; use clap::{command, Args, Command, FromArgMatches}; -use core::fmt; use once_cell::sync::Lazy; use parquet::basic::{BrotliLevel, GzipLevel, ZstdLevel}; +use serde::{Deserialize, Serialize}; use std::env; use std::path::PathBuf; use std::sync::Arc; @@ -236,7 +236,7 @@ Join the community at https://logg.ing/community. .subcommands([local, s3, azureblob]) } -#[derive(Debug, Default, Eq, PartialEq)] +#[derive(Debug, Default, Eq, PartialEq, Clone, Copy, Serialize, Deserialize)] pub enum Mode { Query, Ingest, @@ -244,54 +244,29 @@ pub enum Mode { All, } -impl Mode { - pub fn to_str(&self) -> &str { - match self { - Mode::Query => "Query", - Mode::Ingest => "Ingest", - Mode::All => "All", - } - } - - pub fn from_string(mode: &str) -> Result { - match mode { - "Query" => Ok(Mode::Query), - "Ingest" => Ok(Mode::Ingest), - "All" => Ok(Mode::All), - x => Err(format!("Trying to Parse Invalid mode: {}", x)), - } - } -} - -impl fmt::Display for Mode { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{}", self.to_str()) - } -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] -#[allow(non_camel_case_types, clippy::upper_case_acronyms)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Deserialize)] +#[serde(rename_all = "lowercase")] pub enum Compression { - UNCOMPRESSED, - SNAPPY, - GZIP, - LZO, - BROTLI, + Uncompressed, + Snappy, + Gzip, + Lzo, + Brotli, #[default] - LZ4, - ZSTD, + Lz4, + Zstd, } impl From for parquet::basic::Compression { fn from(value: Compression) -> Self { match value { - Compression::UNCOMPRESSED => parquet::basic::Compression::UNCOMPRESSED, - Compression::SNAPPY => parquet::basic::Compression::SNAPPY, - Compression::GZIP => parquet::basic::Compression::GZIP(GzipLevel::default()), - Compression::LZO => parquet::basic::Compression::LZO, - Compression::BROTLI => parquet::basic::Compression::BROTLI(BrotliLevel::default()), - Compression::LZ4 => parquet::basic::Compression::LZ4, - Compression::ZSTD => parquet::basic::Compression::ZSTD(ZstdLevel::default()), + Compression::Uncompressed => parquet::basic::Compression::UNCOMPRESSED, + Compression::Snappy => parquet::basic::Compression::SNAPPY, + Compression::Gzip => parquet::basic::Compression::GZIP(GzipLevel::default()), + Compression::Lzo => parquet::basic::Compression::LZO, + Compression::Brotli => parquet::basic::Compression::BROTLI(BrotliLevel::default()), + Compression::Lz4 => parquet::basic::Compression::LZ4, + Compression::Zstd => parquet::basic::Compression::ZSTD(ZstdLevel::default()), } } } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index d4fe7e037..662ad0b99 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -113,57 +113,240 @@ struct StandardTableProvider { url: Url, } -#[allow(clippy::too_many_arguments)] -async fn create_parquet_physical_plan( - object_store_url: ObjectStoreUrl, - partitions: Vec>, - statistics: Statistics, - schema: Arc, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &dyn Session, - time_partition: Option, -) -> Result, DataFusionError> { - let filters = if let Some(expr) = conjunction(filters.to_vec()) { - let table_df_schema = schema.as_ref().clone().to_dfschema()?; - let filters = create_physical_expr(&expr, &table_df_schema, state.execution_props())?; - Some(filters) - } else { - None - }; - - let sort_expr = PhysicalSortExpr { - expr: if let Some(time_partition) = time_partition { - physical_plan::expressions::col(&time_partition, &schema)? +impl StandardTableProvider { + #[allow(clippy::too_many_arguments)] + async fn create_parquet_physical_plan( + &self, + execution_plans: &mut Vec>, + object_store_url: ObjectStoreUrl, + partitions: Vec>, + statistics: Statistics, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option, + ) -> Result<(), DataFusionError> { + let filters = if let Some(expr) = conjunction(filters.to_vec()) { + let table_df_schema = self.schema.as_ref().clone().to_dfschema()?; + let filters = create_physical_expr(&expr, &table_df_schema, state.execution_props())?; + Some(filters) } else { - physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)? - }, - options: SortOptions { - descending: true, - nulls_first: true, - }, - }; - let file_format = ParquetFormat::default().with_enable_pruning(true); - - // create the execution plan - let plan = file_format - .create_physical_plan( - state.as_any().downcast_ref::().unwrap(), // Remove this when ParquetFormat catches up - FileScanConfig { - object_store_url, - file_schema: schema.clone(), - file_groups: partitions, - statistics, - projection: projection.cloned(), - limit, - output_ordering: vec![vec![sort_expr]], - table_partition_cols: Vec::new(), + None + }; + + let sort_expr = PhysicalSortExpr { + expr: if let Some(time_partition) = time_partition { + physical_plan::expressions::col(&time_partition, &self.schema)? + } else { + physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &self.schema)? + }, + options: SortOptions { + descending: true, + nulls_first: true, }, - filters.as_ref(), + }; + let file_format = ParquetFormat::default().with_enable_pruning(true); + + // create the execution plan + let plan = file_format + .create_physical_plan( + state.as_any().downcast_ref::().unwrap(), // Remove this when ParquetFormat catches up + FileScanConfig { + object_store_url, + file_schema: self.schema.clone(), + file_groups: partitions, + statistics, + projection: projection.cloned(), + limit, + output_ordering: vec![vec![sort_expr]], + table_partition_cols: Vec::new(), + }, + filters.as_ref(), + ) + .await?; + execution_plans.push(plan); + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn get_hottier_exectuion_plan( + &self, + execution_plans: &mut Vec>, + hot_tier_manager: &HotTierManager, + manifest_files: &mut Vec, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &dyn Session, + time_partition: Option, + ) -> Result<(), DataFusionError> { + let hot_tier_files = hot_tier_manager + .get_hot_tier_manifest_files(&self.stream, manifest_files) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + let hot_tier_files = hot_tier_files + .into_iter() + .map(|mut file| { + let path = CONFIG + .parseable + .hot_tier_storage_path + .as_ref() + .unwrap() + .join(&file.file_path); + file.file_path = path.to_str().unwrap().to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files); + self.create_parquet_physical_plan( + execution_plans, + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + projection, + filters, + limit, + state, + time_partition.clone(), ) .await?; - Ok(plan) + + Ok(()) + } + + #[allow(clippy::too_many_arguments)] + async fn legacy_listing_table( + &self, + execution_plans: &mut Vec>, + glob_storage: Arc, + object_store: Arc, + time_filters: &[PartialTimeFilter], + state: &dyn Session, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + time_partition: Option, + ) -> Result<(), DataFusionError> { + ListingTableBuilder::new(self.stream.to_owned()) + .populate_via_listing(glob_storage.clone(), object_store, time_filters) + .and_then(|builder| async { + let table = builder.build( + self.schema.clone(), + |x| glob_storage.query_prefixes(x), + time_partition, + )?; + if let Some(table) = table { + let plan = table.scan(state, projection, filters, limit).await?; + execution_plans.push(plan); + } + + Ok(()) + }) + .await?; + + Ok(()) + } + + fn final_plan( + &self, + mut execution_plans: Vec>, + projection: Option<&Vec>, + ) -> Result, DataFusionError> { + let exec: Arc = if execution_plans.is_empty() { + let schema = match projection { + Some(projection) => Arc::new(self.schema.project(projection)?), + None => self.schema.to_owned(), + }; + Arc::new(EmptyExec::new(schema)) + } else if execution_plans.len() == 1 { + execution_plans.pop().unwrap() + } else { + Arc::new(UnionExec::new(execution_plans)) + }; + Ok(exec) + } + + fn partitioned_files( + &self, + manifest_files: Vec, + ) -> (Vec>, datafusion::common::Statistics) { + let target_partition = num_cpus::get(); + let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); + let mut column_statistics = + HashMap::>::new(); + let mut count = 0; + for (index, file) in manifest_files + .into_iter() + .enumerate() + .map(|(x, y)| (x % target_partition, y)) + { + #[allow(unused_mut)] + let catalog::manifest::File { + mut file_path, + num_rows, + columns, + .. + } = file; + + // object_store::path::Path doesn't automatically deal with Windows path separators + // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem + // before sending the file path to PartitionedFile + // the github issue- https://github.com/parseablehq/parseable/issues/824 + // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution + // TODO: figure out an elegant solution to this + #[cfg(windows)] + { + if CONFIG.storage_name.eq("drive") { + file_path = object_store::path::Path::from_absolute_path(file_path).unwrap(); + } + } + let pf = PartitionedFile::new(file_path, file.file_size); + partitioned_files[index].push(pf); + + columns.into_iter().for_each(|col| { + column_statistics + .entry(col.name) + .and_modify(|x| { + if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) + { + *x = Some(stats.update(col_stats)); + } + }) + .or_insert_with(|| col.stats.as_ref().cloned()); + }); + count += num_rows; + } + let statistics = self + .schema + .fields() + .iter() + .map(|field| { + column_statistics + .get(field.name()) + .and_then(|stats| stats.as_ref()) + .and_then(|stats| stats.clone().min_max_as_scalar(field.data_type())) + .map(|(min, max)| datafusion::common::ColumnStatistics { + null_count: Precision::Absent, + max_value: Precision::Exact(max), + min_value: Precision::Exact(min), + distinct_count: Precision::Absent, + }) + .unwrap_or_default() + }) + .collect(); + + let statistics = datafusion::common::Statistics { + num_rows: Precision::Exact(count as usize), + total_byte_size: Precision::Absent, + column_statistics: statistics, + }; + + (partitioned_files, statistics) + } } async fn collect_from_snapshot( @@ -213,88 +396,6 @@ async fn collect_from_snapshot( Ok(manifest_files) } -fn partitioned_files( - manifest_files: Vec, - table_schema: &Schema, -) -> (Vec>, datafusion::common::Statistics) { - let target_partition = num_cpus::get(); - let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); - let mut column_statistics = HashMap::>::new(); - let mut count = 0; - for (index, file) in manifest_files - .into_iter() - .enumerate() - .map(|(x, y)| (x % target_partition, y)) - { - let catalog::manifest::File { - file_path, - num_rows, - columns, - .. - } = file; - - // object_store::path::Path doesn't automatically deal with Windows path separators - // to do that, we are using from_absolute_path() which takes into consideration the underlying filesystem - // before sending the file path to PartitionedFile - // the github issue- https://github.com/parseablehq/parseable/issues/824 - // For some reason, the `from_absolute_path()` doesn't work for macos, hence the ugly solution - // TODO: figure out an elegant solution to this - let pf; - - #[cfg(unix)] - { - pf = PartitionedFile::new(file_path, file.file_size); - } - #[cfg(windows)] - { - pf = if CONFIG.storage_name.eq("drive") { - let file_path = object_store::path::Path::from_absolute_path(file_path).unwrap(); - PartitionedFile::new(file_path, file.file_size) - } else { - PartitionedFile::new(file_path, file.file_size) - }; - } - - partitioned_files[index].push(pf); - columns.into_iter().for_each(|col| { - column_statistics - .entry(col.name) - .and_modify(|x| { - if let Some((stats, col_stats)) = x.as_ref().cloned().zip(col.stats.clone()) { - *x = Some(stats.update(col_stats)); - } - }) - .or_insert_with(|| col.stats.as_ref().cloned()); - }); - count += num_rows; - } - let statistics = table_schema - .fields() - .iter() - .map(|field| { - column_statistics - .get(field.name()) - .and_then(|stats| stats.as_ref()) - .and_then(|stats| stats.clone().min_max_as_scalar(field.data_type())) - .map(|(min, max)| datafusion::common::ColumnStatistics { - null_count: Precision::Absent, - max_value: Precision::Exact(max), - min_value: Precision::Exact(min), - distinct_count: Precision::Absent, - }) - .unwrap_or_default() - }) - .collect(); - - let statistics = datafusion::common::Statistics { - num_rows: Precision::Exact(count as usize), - total_byte_size: Precision::Absent, - column_statistics: statistics, - }; - - (partitioned_files, statistics) -} - #[async_trait::async_trait] impl TableProvider for StandardTableProvider { fn as_any(&self) -> &dyn std::any::Any { @@ -316,9 +417,7 @@ impl TableProvider for StandardTableProvider { filters: &[Expr], limit: Option, ) -> Result, DataFusionError> { - let mut memory_exec = None; - let mut hot_tier_exec = None; - let mut listing_exec = None; + let mut execution_plans = vec![]; let object_store = state .runtime_env() .object_store_registry @@ -341,11 +440,11 @@ impl TableProvider for StandardTableProvider { event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?; - memory_exec = Some( - reversed_mem_table - .scan(state, projection, filters, limit) - .await?, - ); + + let memory_exec = reversed_mem_table + .scan(state, projection, filters, limit) + .await?; + execution_plans.push(memory_exec); } }; let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); @@ -379,23 +478,20 @@ impl TableProvider for StandardTableProvider { let listing_time_fiters = return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); - listing_exec = if let Some(listing_time_filter) = listing_time_fiters { - legacy_listing_table( - self.stream.clone(), + if let Some(listing_time_filter) = listing_time_fiters { + self.legacy_listing_table( + &mut execution_plans, glob_storage.clone(), object_store.clone(), &listing_time_filter, - self.schema.clone(), state, projection, filters, limit, time_partition.clone(), ) - .await? - } else { - None - }; + .await?; + } } let mut manifest_files = collect_from_snapshot( @@ -408,21 +504,16 @@ impl TableProvider for StandardTableProvider { .await?; if manifest_files.is_empty() { - return final_plan( - vec![listing_exec, memory_exec], - projection, - self.schema.clone(), - ); + return self.final_plan(execution_plans, projection); } // Hot tier data fetch if let Some(hot_tier_manager) = HotTierManager::global() { if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { - hot_tier_exec = get_hottier_exectuion_plan( + self.get_hottier_exectuion_plan( + &mut execution_plans, hot_tier_manager, - &self.stream, &mut manifest_files, - self.schema.clone(), projection, filters, limit, @@ -434,19 +525,15 @@ impl TableProvider for StandardTableProvider { } if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); - return final_plan( - vec![listing_exec, memory_exec, hot_tier_exec], - projection, - self.schema.clone(), - ); + return self.final_plan(execution_plans, projection); } - let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema); - let remote_exec = create_parquet_physical_plan( + let (partitioned_files, statistics) = self.partitioned_files(manifest_files); + self.create_parquet_physical_plan( + &mut execution_plans, ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(), partitioned_files, statistics, - self.schema.clone(), projection, filters, limit, @@ -455,11 +542,7 @@ impl TableProvider for StandardTableProvider { ) .await?; - Ok(final_plan( - vec![listing_exec, memory_exec, hot_tier_exec, Some(remote_exec)], - projection, - self.schema.clone(), - )?) + Ok(self.final_plan(execution_plans, projection)?) } /* @@ -487,110 +570,6 @@ impl TableProvider for StandardTableProvider { } } -#[allow(clippy::too_many_arguments)] -async fn get_hottier_exectuion_plan( - hot_tier_manager: &HotTierManager, - stream: &str, - manifest_files: &mut Vec, - schema: Arc, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - state: &dyn Session, - time_partition: Option, -) -> Result>, DataFusionError> { - let (hot_tier_files, remainder) = hot_tier_manager - .get_hot_tier_manifest_files(stream, manifest_files.clone()) - .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - // Assign remaining entries back to manifest list - // This is to be used for remote query - *manifest_files = remainder; - - let hot_tier_files = hot_tier_files - .into_iter() - .map(|mut file| { - let path = CONFIG - .parseable - .hot_tier_storage_path - .as_ref() - .unwrap() - .join(&file.file_path); - file.file_path = path.to_str().unwrap().to_string(); - file - }) - .collect(); - - let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema); - let plan = create_parquet_physical_plan( - ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, - statistics, - schema.clone(), - projection, - filters, - limit, - state, - time_partition.clone(), - ) - .await?; - - Ok(Some(plan)) -} - -#[allow(clippy::too_many_arguments)] -async fn legacy_listing_table( - stream: String, - glob_storage: Arc, - object_store: Arc, - time_filters: &[PartialTimeFilter], - schema: Arc, - state: &dyn Session, - projection: Option<&Vec>, - filters: &[Expr], - limit: Option, - time_partition: Option, -) -> Result>, DataFusionError> { - let remote_table = ListingTableBuilder::new(stream) - .populate_via_listing(glob_storage.clone(), object_store, time_filters) - .and_then(|builder| async { - let table = builder.build( - schema.clone(), - |x| glob_storage.query_prefixes(x), - time_partition, - )?; - let res = match table { - Some(table) => Some(table.scan(state, projection, filters, limit).await?), - _ => None, - }; - Ok(res) - }) - .await?; - - Ok(remote_table) -} - -fn final_plan( - execution_plans: Vec>>, - projection: Option<&Vec>, - schema: Arc, -) -> Result, DataFusionError> { - let mut execution_plans = execution_plans.into_iter().flatten().collect_vec(); - - let exec: Arc = if execution_plans.is_empty() { - let schema = match projection { - Some(projection) => Arc::new(schema.project(projection)?), - None => schema, - }; - Arc::new(EmptyExec::new(schema)) - } else if execution_plans.len() == 1 { - execution_plans.pop().unwrap() - } else { - Arc::new(UnionExec::new(execution_plans)) - }; - Ok(exec) -} - fn reversed_mem_table( mut records: Vec, schema: Arc, @@ -992,7 +971,7 @@ mod tests { fn datetime_max(year: i32, month: u32, day: u32) -> DateTime { NaiveDate::from_ymd_opt(year, month, day) .unwrap() - .and_hms_milli_opt(23, 59, 59, 99) + .and_hms_milli_opt(23, 59, 59, 999) .unwrap() .and_utc() } diff --git a/src/storage/azure_blob.rs b/src/storage/azure_blob.rs index 0f3099585..0cbd89f21 100644 --- a/src/storage/azure_blob.rs +++ b/src/storage/azure_blob.rs @@ -164,7 +164,7 @@ impl ObjectStorageProvider for AzureBlobConfig { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn construct_client(&self) -> Arc { let azure = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit let azure = LimitStore::new(azure, super::MAX_OBJECT_STORE_REQUESTS); diff --git a/src/storage/localfs.rs b/src/storage/localfs.rs index b3d3e09cd..78b8114c0 100644 --- a/src/storage/localfs.rs +++ b/src/storage/localfs.rs @@ -67,7 +67,7 @@ impl ObjectStorageProvider for FSConfig { RuntimeConfig::new() } - fn get_object_store(&self) -> Arc { + fn construct_client(&self) -> Arc { Arc::new(LocalFS::new(self.root.clone())) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 9ace537d9..d89a0dfce 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -45,6 +45,7 @@ use async_trait::async_trait; use bytes::Bytes; use chrono::Local; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeConfig}; +use once_cell::sync::OnceCell; use relative_path::RelativePath; use relative_path::RelativePathBuf; use tracing::error; @@ -60,7 +61,12 @@ use std::{ pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeConfig; - fn get_object_store(&self) -> Arc; + fn construct_client(&self) -> Arc; + fn get_object_store(&self) -> Arc { + static STORE: OnceCell> = OnceCell::new(); + + STORE.get_or_init(|| self.construct_client()).clone() + } fn get_endpoint(&self) -> String; fn register_store_metrics(&self, handler: &PrometheusMetrics); } diff --git a/src/storage/s3.rs b/src/storage/s3.rs index a501c6250..13c394128 100644 --- a/src/storage/s3.rs +++ b/src/storage/s3.rs @@ -157,31 +157,40 @@ pub enum SSECEncryptionKey { }, } +#[derive(Debug, thiserror::Error)] +pub enum SSEError { + #[error("Expected SSE-C:AES256:")] + UnexpectedKey, + #[error("Only SSE-C is supported for object encryption for now")] + UnexpectedProtocol, + #[error("Invalid SSE algorithm. Following are supported: AES256")] + InvalidAlgorithm, +} + impl FromStr for SSECEncryptionKey { - type Err = String; + type Err = SSEError; fn from_str(s: &str) -> Result { let parts = s.split(':').collect::>(); - if parts.len() == 3 { - let sse_type = parts[0]; - if sse_type != "SSE-C" { - return Err("Only SSE-C is supported for object encryption for now".into()); - } + if parts.len() != 3 { + return Err(SSEError::UnexpectedKey); + } + let sse_type = parts[0]; + if sse_type != "SSE-C" { + return Err(SSEError::UnexpectedProtocol); + } - let algorithm = parts[1]; - let encryption_key = parts[2]; + let algorithm = parts[1]; + let encryption_key = parts[2]; - let alg = ObjectEncryptionAlgorithm::from_str(algorithm)?; + let alg = ObjectEncryptionAlgorithm::from_str(algorithm)?; - Ok(match alg { - ObjectEncryptionAlgorithm::Aes256 => SSECEncryptionKey::SseC { - _algorithm: alg, - base64_encryption_key: encryption_key.to_owned(), - }, - }) - } else { - Err("Expected SSE-C:AES256:".into()) - } + Ok(match alg { + ObjectEncryptionAlgorithm::Aes256 => SSECEncryptionKey::SseC { + _algorithm: alg, + base64_encryption_key: encryption_key.to_owned(), + }, + }) } } @@ -191,12 +200,12 @@ pub enum ObjectEncryptionAlgorithm { } impl FromStr for ObjectEncryptionAlgorithm { - type Err = String; + type Err = SSEError; fn from_str(s: &str) -> Result { match s { "AES256" => Ok(ObjectEncryptionAlgorithm::Aes256), - _ => Err("Invalid SSE algorithm. Following are supported: AES256".into()), + _ => Err(SSEError::InvalidAlgorithm), } } } @@ -290,7 +299,7 @@ impl ObjectStorageProvider for S3Config { RuntimeConfig::new().with_object_store_registry(Arc::new(object_store_registry)) } - fn get_object_store(&self) -> Arc { + fn construct_client(&self) -> Arc { let s3 = self.get_default_builder().build().unwrap(); // limit objectstore to a concurrent request limit diff --git a/src/storage/staging.rs b/src/storage/staging.rs index 85df86996..e93aa4bfe 100644 --- a/src/storage/staging.rs +++ b/src/storage/staging.rs @@ -459,7 +459,7 @@ pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { let file_name = format!("ingestor.{}.json", info.ingestor_id); let file_path = path.join(file_name); - std::fs::write(file_path, serde_json::to_string(&info)?)?; + std::fs::write(file_path, serde_json::to_vec(&info)?)?; Ok(()) } diff --git a/src/storage/store_metadata.rs b/src/storage/store_metadata.rs index dac0d26be..465f8f740 100644 --- a/src/storage/store_metadata.rs +++ b/src/storage/store_metadata.rs @@ -58,7 +58,7 @@ pub struct StorageMetadata { pub deployment_id: uid::Uid, pub users: Vec, pub streams: Vec, - pub server_mode: String, + pub server_mode: Mode, #[serde(default)] pub roles: HashMap>, #[serde(default)] @@ -73,7 +73,7 @@ impl Default for StorageMetadata { staging: CONFIG.staging_dir().to_path_buf(), storage: CONFIG.storage().get_endpoint(), deployment_id: uid::gen(), - server_mode: CONFIG.parseable.mode.to_string(), + server_mode: CONFIG.parseable.mode, users: Vec::new(), streams: Vec::new(), roles: HashMap::default(), @@ -121,8 +121,7 @@ pub async fn resolve_parseable_metadata( // overwrite staging anyways so that it matches remote in case of any divergence overwrite_staging = true; if CONFIG.parseable.mode == Mode::All { - standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid here")) - ?; + standalone_after_distributed(metadata.server_mode)?; } Ok(metadata) }, @@ -132,10 +131,7 @@ pub async fn resolve_parseable_metadata( EnvChange::NewStaging(mut metadata) => { // if server is started in ingest mode,we need to make sure that query mode has been started // i.e the metadata is updated to reflect the server mode = Query - if Mode::from_string(&metadata.server_mode) - .map_err(ObjectStorageError::Custom) - ? - == Mode::All && CONFIG.parseable.mode == Mode::Ingest { + if metadata.server_mode== Mode::All && CONFIG.parseable.mode == Mode::Ingest { Err("Starting Ingest Mode is not allowed, Since Query Server has not been started yet") } else { create_dir_all(CONFIG.staging_dir())?; @@ -146,7 +142,7 @@ pub async fn resolve_parseable_metadata( // because staging dir has changed. match CONFIG.parseable.mode { Mode::All => { - standalone_after_distributed(Mode::from_string(&metadata.server_mode).expect("mode should be valid at here")) + standalone_after_distributed(metadata.server_mode) .map_err(|err| { ObjectStorageError::Custom(err.to_string()) })?; @@ -154,13 +150,13 @@ pub async fn resolve_parseable_metadata( }, Mode::Query => { overwrite_remote = true; - metadata.server_mode = CONFIG.parseable.mode.to_string(); + metadata.server_mode = CONFIG.parseable.mode; metadata.staging = CONFIG.staging_dir().to_path_buf(); }, Mode::Ingest => { // if ingest server is started fetch the metadata from remote // update the server mode for local metadata - metadata.server_mode = CONFIG.parseable.mode.to_string(); + metadata.server_mode = CONFIG.parseable.mode; metadata.staging = CONFIG.staging_dir().to_path_buf(); }, } @@ -188,7 +184,7 @@ pub async fn resolve_parseable_metadata( ObjectStorageError::UnhandledError(err) })?; - metadata.server_mode = CONFIG.parseable.mode.to_string(); + metadata.server_mode = CONFIG.parseable.mode; if overwrite_remote { put_remote_metadata(&metadata).await?; } @@ -208,8 +204,7 @@ fn determine_environment( (Some(staging), Some(remote)) => { // if both staging and remote have same deployment id but different server modes if staging.deployment_id == remote.deployment_id - && Mode::from_string(&remote.server_mode).expect("server mode is valid here") - == Mode::All + && remote.server_mode == Mode::All && (CONFIG.parseable.mode == Mode::Query || CONFIG.parseable.mode == Mode::Ingest) { EnvChange::NewStaging(remote) @@ -273,7 +268,7 @@ pub async fn put_remote_metadata(metadata: &StorageMetadata) -> Result<(), Objec pub fn put_staging_metadata(meta: &StorageMetadata) -> io::Result<()> { let mut staging_metadata = meta.clone(); - staging_metadata.server_mode = CONFIG.parseable.mode.to_string(); + staging_metadata.server_mode = CONFIG.parseable.mode; staging_metadata.staging = CONFIG.staging_dir().to_path_buf(); let path = CONFIG.staging_dir().join(PARSEABLE_METADATA_FILE_NAME); let mut file = OpenOptions::new() diff --git a/src/utils/json/flatten.rs b/src/utils/json/flatten.rs index d66327a47..a9ae36658 100644 --- a/src/utils/json/flatten.rs +++ b/src/utils/json/flatten.rs @@ -25,9 +25,9 @@ use serde_json::value::Value; pub fn flatten( nested_value: Value, separator: &str, - time_partition: Option, - time_partition_limit: Option, - custom_partition: Option, + time_partition: Option<&String>, + time_partition_limit: Option<&String>, + custom_partition: Option<&String>, validation_required: bool, ) -> Result { match nested_value { @@ -35,13 +35,13 @@ pub fn flatten( if validation_required { let validate_time_partition_result = validate_time_partition( &Value::Object(nested_dict.clone()), - time_partition.clone(), - time_partition_limit.clone(), + time_partition, + time_partition_limit, ); let validate_custom_partition_result = validate_custom_partition( &Value::Object(nested_dict.clone()), - custom_partition.clone(), + custom_partition, ); if validate_time_partition_result.is_ok() { if validate_custom_partition_result.is_ok() { @@ -64,13 +64,10 @@ pub fn flatten( for _value in &mut arr { let value: Value = _value.clone(); if validation_required { - let validate_time_partition_result = validate_time_partition( - &value, - time_partition.clone(), - time_partition_limit.clone(), - ); + let validate_time_partition_result = + validate_time_partition(&value, time_partition, time_partition_limit); let validate_custom_partition_result = - validate_custom_partition(&value, custom_partition.clone()); + validate_custom_partition(&value, custom_partition); if validate_time_partition_result.is_ok() { if validate_custom_partition_result.is_ok() { let value = std::mem::replace(_value, Value::Null); @@ -104,7 +101,7 @@ pub fn flatten( pub fn validate_custom_partition( value: &Value, - custom_partition: Option, + custom_partition: Option<&String>, ) -> Result { if custom_partition.is_none() { return Ok(true); @@ -145,8 +142,8 @@ pub fn validate_custom_partition( pub fn validate_time_partition( value: &Value, - time_partition: Option, - time_partition_limit: Option, + time_partition: Option<&String>, + time_partition_limit: Option<&String>, ) -> Result { if time_partition.is_none() { Ok(true) @@ -156,7 +153,7 @@ pub fn validate_time_partition( } else { 30 }; - let body_timestamp = value.get(time_partition.clone().unwrap().to_string()); + let body_timestamp = value.get(time_partition.unwrap().to_string()); if body_timestamp.is_some() && body_timestamp.unwrap().to_owned().as_str().is_some() { if body_timestamp .unwrap() diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 003b3b55e..ba8f6faf5 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -22,30 +22,29 @@ use serde_json::Value; pub mod flatten; pub fn flatten_json_body( - body: serde_json::Value, - time_partition: Option, - time_partition_limit: Option, - custom_partition: Option, + body: &Value, + time_partition: Option<&String>, + time_partition_limit: Option<&String>, + custom_partition: Option<&String>, validation_required: bool, ) -> Result { - match flatten::convert_to_array(flatten::flatten_json(&body)) { - Ok(nested_value) => flatten::flatten( - nested_value, - "_", - time_partition, - time_partition_limit, - custom_partition, - validation_required, - ), - Err(err) => Err(err), - } + let nested_value = flatten::convert_to_array(flatten::flatten_json(body))?; + + flatten::flatten( + nested_value, + "_", + time_partition, + time_partition_limit, + custom_partition, + validation_required, + ) } pub fn convert_array_to_object( - body: Value, - time_partition: Option, - time_partition_limit: Option, - custom_partition: Option, + body: &Value, + time_partition: Option<&String>, + time_partition_limit: Option<&String>, + custom_partition: Option<&String>, ) -> Result, anyhow::Error> { let data = flatten_json_body( body,