Skip to content

Commit

Permalink
Merge branch 'main' into perf
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Oct 23, 2024
2 parents 37df961 + 6e897bf commit 3b2a43a
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 4 deletions.
29 changes: 29 additions & 0 deletions server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ use std::{collections::HashMap, sync::Arc};
use anyhow::{anyhow, Error as AnyError};
use arrow_array::{RecordBatch, StringArray};
use arrow_schema::{DataType, Field, Schema, TimeUnit};
use chrono::DateTime;
use serde_json::Value;

use crate::utils::{self, arrow::get_field};

Expand Down Expand Up @@ -171,3 +173,30 @@ pub fn update_field_type_in_schema(
.collect();
Arc::new(Schema::new(new_schema))
}

pub fn update_data_type_to_datetime(schema: Schema, value: Value) -> Schema {
let new_schema: Vec<Field> = schema
.fields()
.iter()
.map(|field| {
if field.data_type() == &DataType::Utf8 {
if let Value::Object(map) = &value {
if let Some(Value::String(s)) = map.get(field.name()) {
if DateTime::parse_from_rfc3339(s).is_ok() {
// Update the field's data type to Timestamp
return Field::new(
field.name().clone(),
DataType::Timestamp(TimeUnit::Millisecond, None),
true,
);
}
}
}
}
// Return the original field if no update is needed
Field::new(field.name(), field.data_type().clone(), true)
})
.collect();

Schema::new(new_schema)
}
22 changes: 22 additions & 0 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
use super::ingest::create_stream_if_not_exists;
use super::modal::utils::logstream_utils::create_update_stream;
use crate::alerts::Alerts;
use crate::event::format::update_data_type_to_datetime;
use crate::handlers::STREAM_TYPE_KEY;
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
use crate::metadata::STREAM_INFO;
Expand All @@ -36,6 +37,7 @@ use crate::{metadata, validator};
use actix_web::http::header::{self, HeaderMap};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use arrow_json::reader::infer_json_schema_from_iterator;
use arrow_schema::{Field, Schema};
use bytes::Bytes;
use chrono::Utc;
Expand Down Expand Up @@ -89,6 +91,26 @@ pub async fn list(_: HttpRequest) -> impl Responder {
web::Json(res)
}

pub async fn detect_schema(body: Bytes) -> Result<impl Responder, StreamError> {
let body_val: Value = serde_json::from_slice(&body)?;
let value_arr: Vec<Value> = match body_val {
Value::Array(arr) => arr,
value @ Value::Object(_) => vec![value],
_ => {
return Err(StreamError::Custom {
msg: "please send json events as part of the request".to_string(),
status: StatusCode::BAD_REQUEST,
})
}
};

let mut schema = infer_json_schema_from_iterator(value_arr.iter().map(Ok)).unwrap();
for value in value_arr {
schema = update_data_type_to_datetime(schema, value);
}
Ok((web::Json(schema), StatusCode::OK))
}

pub async fn schema(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let schema = STREAM_INFO.schema(&stream_name)?;
Expand Down
11 changes: 11 additions & 0 deletions server/src/handlers/http/modal/query_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,17 @@ impl QueryServer {
web::resource("")
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
)
.service(
web::scope("/schema/detect").service(
web::resource("")
// PUT "/logstream/{logstream}" ==> Create log stream
.route(
web::post()
.to(logstream::detect_schema)
.authorize(Action::DetectSchema),
),
),
)
.service(
web::scope("/{logstream}")
.service(
Expand Down
11 changes: 11 additions & 0 deletions server/src/handlers/http/modal/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,17 @@ impl Server {
web::resource("")
.route(web::get().to(logstream::list).authorize(Action::ListStream)),
)
.service(
web::scope("/schema/detect").service(
web::resource("")
// PUT "/logstream/{logstream}" ==> Create log stream
.route(
web::post()
.to(logstream::detect_schema)
.authorize(Action::DetectSchema),
),
),
)
.service(
web::scope("/{logstream}")
.service(
Expand Down
3 changes: 3 additions & 0 deletions server/src/rbac/role.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ pub enum Action {
CreateStream,
ListStream,
GetStreamInfo,
DetectSchema,
GetSchema,
GetStats,
DeleteStream,
Expand Down Expand Up @@ -140,6 +141,7 @@ impl RoleBuilder {
| Action::GetAnalytics => Permission::Unit(action),
Action::Ingest
| Action::GetSchema
| Action::DetectSchema
| Action::GetStats
| Action::GetRetention
| Action::PutRetention
Expand Down Expand Up @@ -214,6 +216,7 @@ pub mod model {
Action::DeleteStream,
Action::ListStream,
Action::GetStreamInfo,
Action::DetectSchema,
Action::GetSchema,
Action::GetStats,
Action::GetRetention,
Expand Down
11 changes: 9 additions & 2 deletions server/src/storage/azure_blob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ use datafusion::datasource::object_store::{
};
use datafusion::execution::runtime_env::RuntimeConfig;
use object_store::azure::{MicrosoftAzure, MicrosoftAzureBuilder};
use object_store::{ClientOptions, ObjectStore, PutPayload};
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
use relative_path::{RelativePath, RelativePathBuf};
use std::path::Path as StdPath;
use url::Url;
Expand Down Expand Up @@ -120,10 +120,17 @@ impl AzureBlobConfig {
.with_connect_timeout(Duration::from_secs(CONNECT_TIMEOUT_SECS))
.with_timeout(Duration::from_secs(REQUEST_TIMEOUT_SECS));

let retry_config = RetryConfig {
max_retries: 5,
retry_timeout: Duration::from_secs(120),
backoff: BackoffConfig::default(),
};

let mut builder = MicrosoftAzureBuilder::new()
.with_endpoint(self.endpoint_url.clone())
.with_account(&self.account)
.with_container_name(&self.container);
.with_container_name(&self.container)
.with_retry(retry_config);

if let Some(access_key) = self.access_key.clone() {
builder = builder.with_access_key(access_key)
Expand Down
10 changes: 8 additions & 2 deletions server/src/storage/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use futures::{StreamExt, TryStreamExt};
use object_store::aws::{AmazonS3, AmazonS3Builder, AmazonS3ConfigKey, Checksum};
use object_store::limit::LimitStore;
use object_store::path::Path as StorePath;
use object_store::{ClientOptions, ObjectStore, PutPayload};
use object_store::{BackoffConfig, ClientOptions, ObjectStore, PutPayload, RetryConfig};
use relative_path::{RelativePath, RelativePathBuf};

use std::collections::BTreeMap;
Expand Down Expand Up @@ -218,13 +218,19 @@ impl S3Config {
if self.skip_tls {
client_options = client_options.with_allow_invalid_certificates(true)
}
let retry_config = RetryConfig {
max_retries: 5,
retry_timeout: Duration::from_secs(30),
backoff: BackoffConfig::default(),
};

let mut builder = AmazonS3Builder::new()
.with_region(&self.region)
.with_endpoint(&self.endpoint_url)
.with_bucket_name(&self.bucket_name)
.with_virtual_hosted_style_request(!self.use_path_style)
.with_allow_http(true);
.with_allow_http(true)
.with_retry(retry_config);

if self.set_checksum {
builder = builder.with_checksum_algorithm(Checksum::SHA256)
Expand Down

0 comments on commit 3b2a43a

Please sign in to comment.