Skip to content

Commit

Permalink
feat: add update stream API (#810)
Browse files Browse the repository at this point in the history
user can update time partition limit and custom partition
in the same API call

* enhancement for update stream
user can make below changes to the existing stream -
1. time partition limit
2. custom partition

send extra header X-P-Update-Header=true to make the update
  • Loading branch information
nikhilsinhaparseable authored Jun 7, 2024
1 parent b92c77b commit 6129992
Show file tree
Hide file tree
Showing 4 changed files with 278 additions and 75 deletions.
2 changes: 1 addition & 1 deletion server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';

const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand Down
288 changes: 214 additions & 74 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
UPDATE_STREAM_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
Expand Down Expand Up @@ -187,77 +188,166 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
}

pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
let time_partition = if let Some((_, time_partition_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_KEY)
{
time_partition_name.to_str().unwrap()
} else {
""
};
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
fetch_headers_from_put_stream_request(&req);

if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
),
status: StatusCode::BAD_REQUEST,
});
}

if !time_partition.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let mut time_partition_in_days: &str = "";
if let Some((_, time_partition_limit_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY)
{
let time_partition_limit = time_partition_limit_name.to_str().unwrap();
if !time_partition_limit.ends_with('d') {
return Err(StreamError::Custom {
msg: "missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
return Err(StreamError::Custom {
msg: "could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
if !time_partition_limit.is_empty() {
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
if let Err(err) = time_partition_days {
return Err(StreamError::CreateStream(err));
} else {
time_partition_in_days = days;
time_partition_in_days = time_partition_days.unwrap();
if update_stream == "true" {
if let Err(err) = update_time_partition_limit_in_stream(
stream_name.clone(),
time_partition_in_days,
)
.await
{
return Err(StreamError::CreateStream(err));
}
return Ok(("Log stream updated", StatusCode::OK));
}
}
}
let static_schema_flag = if let Some((_, static_schema_flag)) = req
.headers()
.iter()
.find(|&(key, _)| key == STATIC_SCHEMA_FLAG)
{
static_schema_flag.to_str().unwrap()
} else {
""
};
let mut custom_partition: &str = "";
if let Some((_, custom_partition_key)) = req
.headers()
.iter()
.find(|&(key, _)| key == CUSTOM_PARTITION_KEY)
{
custom_partition = custom_partition_key.to_str().unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
if custom_partition_list.len() > 3 {
return Err(StreamError::Custom {
msg: "maximum 3 custom partition keys are supported".to_string(),
status: StatusCode::BAD_REQUEST,
});

if !static_schema_flag.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !custom_partition.is_empty() {
if let Err(err) = validate_custom_partition(&custom_partition) {
return Err(StreamError::CreateStream(err));
}
if update_stream == "true" {
if let Err(err) =
update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await
{
return Err(StreamError::CreateStream(err));
}
return Ok(("Log stream updated", StatusCode::OK));
}
}

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let mut schema = Arc::new(Schema::empty());
if metadata::STREAM_INFO.stream_exists(&stream_name) {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"logstream {stream_name} already exists, please create a new log stream with unique name"
),
let schema = validate_static_schema(
&body,
&stream_name,
&time_partition,
&custom_partition,
&static_schema_flag,
);
if let Err(err) = schema {
return Err(StreamError::CreateStream(err));
}

create_stream(
stream_name,
&time_partition,
time_partition_in_days,
&custom_partition,
&static_schema_flag,
schema.unwrap(),
)
.await?;

Ok(("Log stream created", StatusCode::OK))
}

fn fetch_headers_from_put_stream_request(
req: &HttpRequest,
) -> (String, String, String, String, String) {
let mut time_partition = String::default();
let mut time_partition_limit = String::default();
let mut custom_partition = String::default();
let mut static_schema_flag = String::default();
let mut update_stream = String::default();
req.headers().iter().for_each(|(key, value)| {
if key == TIME_PARTITION_KEY {
time_partition = value.to_str().unwrap().to_string();
}
if key == TIME_PARTITION_LIMIT_KEY {
time_partition_limit = value.to_str().unwrap().to_string();
}
if key == CUSTOM_PARTITION_KEY {
custom_partition = value.to_str().unwrap().to_string();
}
if key == STATIC_SCHEMA_FLAG {
static_schema_flag = value.to_str().unwrap().to_string();
}
if key == UPDATE_STREAM_KEY {
update_stream = value.to_str().unwrap().to_string();
}
});

(
time_partition,
time_partition_limit,
custom_partition,
static_schema_flag,
update_stream,
)
}

fn validate_time_partition_limit(time_partition_limit: &str) -> Result<&str, CreateStreamError> {
if !time_partition_limit.ends_with('d') {
return Err(CreateStreamError::Custom {
msg: "Missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
return Err(CreateStreamError::Custom {
msg: "Could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

Ok(days)
}

fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
if custom_partition_list.len() > 3 {
return Err(CreateStreamError::Custom {
msg: "Maximum 3 custom partition keys are supported".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
Ok(())
}

fn validate_static_schema(
body: &Bytes,
stream_name: &str,
time_partition: &str,
custom_partition: &str,
static_schema_flag: &str,
) -> Result<Arc<Schema>, CreateStreamError> {
let mut schema = Arc::new(Schema::empty());
if !body.is_empty() && static_schema_flag == "true" {
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
let static_schema: StaticSchema = serde_json::from_slice(body)?;

let parsed_schema = convert_static_schema_to_arrow_schema(
static_schema.clone(),
Expand All @@ -267,31 +357,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
if let Ok(parsed_schema) = parsed_schema {
schema = parsed_schema;
} else {
return Err(StreamError::Custom {
msg: format!("unable to commit static schema, logstream {stream_name} not created"),
return Err(CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
});
}
} else if body.is_empty() && static_schema_flag == "true" {
return Err(StreamError::Custom {
return Err(CreateStreamError::Custom {
msg: format!(
"please provide schema in the request body for static schema logstream {stream_name}"
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

create_stream(
stream_name,
time_partition,
time_partition_in_days,
custom_partition,
static_schema_flag,
schema,
)
.await?;

Ok(("log stream created", StatusCode::OK))
Ok(schema)
}

pub async fn put_alert(
Expand Down Expand Up @@ -626,6 +706,56 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

pub async fn update_time_partition_limit_in_stream(
stream_name: String,
time_partition_limit: &str,
) -> Result<(), CreateStreamError> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_time_partition_limit_in_stream(&stream_name, time_partition_limit)
.await
{
return Err(CreateStreamError::Storage { stream_name, err });
}

if metadata::STREAM_INFO
.update_time_partition_limit(&stream_name, time_partition_limit.to_string())
.is_err()
{
return Err(CreateStreamError::Custom {
msg: "failed to update time partition limit in metadata".to_string(),
status: StatusCode::EXPECTATION_FAILED,
});
}

Ok(())
}

pub async fn update_custom_partition_in_stream(
stream_name: String,
custom_partition: &str,
) -> Result<(), CreateStreamError> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_custom_partition_in_stream(&stream_name, custom_partition)
.await
{
return Err(CreateStreamError::Storage { stream_name, err });
}

if metadata::STREAM_INFO
.update_custom_partition(&stream_name, custom_partition.to_string())
.is_err()
{
return Err(CreateStreamError::Custom {
msg: "failed to update custom partition in metadata".to_string(),
status: StatusCode::EXPECTATION_FAILED,
});
}

Ok(())
}

pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand Down Expand Up @@ -757,6 +887,10 @@ pub mod error {
stream_name: String,
err: ObjectStorageError,
},
#[error("{msg}")]
Custom { msg: String, status: StatusCode },
#[error("Could not deserialize into JSON object, {0}")]
SerdeError(#[from] serde_json::Error),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -809,6 +943,12 @@ pub mod error {
StreamError::CreateStream(CreateStreamError::Storage { .. }) => {
StatusCode::INTERNAL_SERVER_ERROR
}
StreamError::CreateStream(CreateStreamError::Custom { .. }) => {
StatusCode::INTERNAL_SERVER_ERROR
}
StreamError::CreateStream(CreateStreamError::SerdeError(_)) => {
StatusCode::BAD_REQUEST
}
StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST,
StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
StreamError::Custom { status, .. } => *status,
Expand Down
27 changes: 27 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,33 @@ impl StreamInfo {
metadata.first_event_at = first_event_at;
})
}

pub fn update_time_partition_limit(
&self,
stream_name: &str,
time_partition_limit: String,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.time_partition_limit = Some(time_partition_limit);
})
}

pub fn update_custom_partition(
&self,
stream_name: &str,
custom_partition: String,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.custom_partition = Some(custom_partition);
})
}

#[allow(clippy::too_many_arguments)]
pub fn add_stream(
&self,
Expand Down
Loading

0 comments on commit 6129992

Please sign in to comment.