diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 8f5971a13..35724bbd7 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc}; use anyhow::{anyhow, Error as AnyError}; use arrow_array::{RecordBatch, StringArray}; use arrow_schema::{DataType, Field, Schema, TimeUnit}; +use chrono::DateTime; +use serde_json::Value; use crate::utils::{self, arrow::get_field}; @@ -171,3 +173,30 @@ pub fn update_field_type_in_schema( .collect(); Arc::new(Schema::new(new_schema)) } + +pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema { + let new_schema: Vec = schema + .fields() + .iter() + .map(|field| { + if field.data_type() == &DataType::Utf8 { + if let Value::Object(map) = &value { + if let Some(Value::String(s)) = map.get(field.name()) { + if DateTime::parse_from_rfc3339(s).is_ok() { + // Update the field's data type to Timestamp + return Field::new( + field.name().clone(), + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ); + } + } + } + } + // Return the original field if no update is needed + Field::new(field.name(), field.data_type().clone(), true) + }) + .collect(); + + Schema::new(new_schema) +} diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 0597738a5..fd2280470 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -22,6 +22,7 @@ use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::create_update_stream; use crate::alerts::Alerts; +use crate::event::format::update_data_type_to_datetime; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; use crate::metadata::STREAM_INFO; @@ -36,6 +37,7 @@ use crate::{metadata, validator}; use actix_web::http::header::{self, HeaderMap}; use actix_web::http::StatusCode; use actix_web::{web, HttpRequest, Responder}; +use arrow_json::reader::infer_json_schema_from_iterator; use arrow_schema::{Field, Schema}; use bytes::Bytes; use chrono::Utc; @@ -89,6 +91,26 @@ pub async fn list(_: HttpRequest) -> impl Responder { web::Json(res) } +pub async fn detect_schema(body: Bytes) -> Result { + let body_val: Value = serde_json::from_slice(&body)?; + let value_arr: Vec = match body_val { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => { + return Err(StreamError::Custom { + msg: "please send json events as part of the request".to_string(), + status: StatusCode::BAD_REQUEST, + }) + } + }; + + let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap(); + for value in value_arr { + schema = update_data_type_to_datetime(schema, value); + } + Ok((web::Json(schema), StatusCode::OK)) +} + pub async fn schema(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); let schema = STREAM_INFO.schema(&stream_name)?; diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 015e01afc..34f6d46a8 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -258,6 +258,17 @@ impl QueryServer { web::resource("") .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) + .service( + web::scope("/schema/detect").service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::post() + .to(logstream::detect_schema) + .authorize(Action::DetectSchema), + ), + ), + ) .service( web::scope("/{logstream}") .service( diff --git a/server/src/handlers/http/modal/server.rs b/server/src/handlers/http/modal/server.rs index d1e4b9aad..ef8467f67 100644 --- a/server/src/handlers/http/modal/server.rs +++ b/server/src/handlers/http/modal/server.rs @@ -292,6 +292,17 @@ impl Server { web::resource("") .route(web::get().to(logstream::list).authorize(Action::ListStream)), ) + .service( + web::scope("/schema/detect").service( + web::resource("") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::post() + .to(logstream::detect_schema) + .authorize(Action::DetectSchema), + ), + ), + ) .service( web::scope("/{logstream}") .service( diff --git a/server/src/rbac/role.rs b/server/src/rbac/role.rs index 522c5a895..0e8f1ab24 100644 --- a/server/src/rbac/role.rs +++ b/server/src/rbac/role.rs @@ -25,6 +25,7 @@ pub enum Action { CreateStream, ListStream, GetStreamInfo, + DetectSchema, GetSchema, GetStats, DeleteStream, @@ -140,6 +141,7 @@ impl RoleBuilder { | Action::GetAnalytics => Permission::Unit(action), Action::Ingest | Action::GetSchema + | Action::DetectSchema | Action::GetStats | Action::GetRetention | Action::PutRetention @@ -214,6 +216,7 @@ pub mod model { Action::DeleteStream, Action::ListStream, Action::GetStreamInfo, + Action::DetectSchema, Action::GetSchema, Action::GetStats, Action::GetRetention, diff --git a/server/src/storage/azure_blob.rs b/server/src/storage/azure_blob.rs index 1c3a27883..6475b8fdc 100644 --- a/server/src/storage/azure_blob.rs +++ b/server/src/storage/azure_blob.rs @@ -30,7 +30,7 @@ use datafusion::datasource::object_store::{ }; use datafusion::execution::runtime_env::RuntimeConfig; use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder}; -use object_store::{ClientOptions, ObjectStore, PutPayload}; +use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; use std::path::Path as StdPath; use url::Url; @@ -120,10 +120,17 @@ impl AzureBlobConfig { .with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS)) .with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS)); + let retry_config = RetryConfig { + max_retries: 5, + retry_timeout: Duration::from_secs(120), + backoff: BackoffConfig::default(), + }; + let mut builder = MicrosoftAzureBuilder::new() .with_endpoint(self.endpoint_url.clone()) .with_account(&self.account) - .with_container_name(&self.container); + .with_container_name(&self.container) + .with_retry(retry_config); if let Some(access_key) = self.access_key.clone() { builder = builder.with_access_key(access_key) diff --git a/server/src/storage/s3.rs b/server/src/storage/s3.rs index 0c9eb982e..0d6513437 100644 --- a/server/src/storage/s3.rs +++ b/server/src/storage/s3.rs @@ -28,7 +28,7 @@ use futures::{StreamExt, TryStreamExt}; use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum}; use object_store::limit::LimitStore; use object_store::path::Path as StorePath; -use object_store::{ClientOptions, ObjectStore, PutPayload}; +use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig}; use relative_path::{RelativePath, RelativePathBuf}; use std::collections::BTreeMap; @@ -218,13 +218,19 @@ impl S3Config { if self.skip_tls { client_options = client_options.with_allow_invalid_certificates(true) } + let retry_config = RetryConfig { + max_retries: 5, + retry_timeout: Duration::from_secs(30), + backoff: BackoffConfig::default(), + }; let mut builder = AmazonS3Builder::new() .with_region(&self.region) .with_endpoint(&self.endpoint_url) .with_bucket_name(&self.bucket_name) .with_virtual_hosted_style_request(!self.use_path_style) - .with_allow_http(true); + .with_allow_http(true) + .with_retry(retry_config); if self.set_checksum { builder = builder.with_checksum_algorithm(Checksum::SHA256)