Skip to content

Commit

Permalink
refactor: parts of hottier (#1022)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Nikhil Sinha <[email protected]>
  • Loading branch information
de-sh and nikhilsinhaparseable authored Jan 28, 2025
1 parent 12c507c commit a0a0ec3
Show file tree
Hide file tree
Showing 9 changed files with 364 additions and 346 deletions.
77 changes: 28 additions & 49 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder

pub async fn put_stream_hot_tier(
stream_name: Path<String>,
Json(json): Json<Value>,
Json(mut hottier): Json<StreamHotTier>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
if !STREAM_INFO.stream_exists(&stream_name) {
Expand All @@ -609,35 +609,28 @@ pub async fn put_stream_hot_tier(
status: StatusCode::BAD_REQUEST,
});
}
if CONFIG.options.hot_tier_storage_path.is_none() {
return Err(StreamError::HotTierNotEnabled(stream_name));
}

let mut hottier: StreamHotTier = match serde_json::from_value(json) {
Ok(hottier) => hottier,
Err(err) => return Err(StreamError::InvalidHotTierConfig(err)),
};

validator::hot_tier(&hottier.size.to_string())?;

STREAM_INFO.set_hot_tier(&stream_name, true)?;
if let Some(hot_tier_manager) = HotTierManager::global() {
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, &hottier.size)
.await?;
hottier.used_size = existing_hot_tier_used_size.to_string();
hottier.available_size = hottier.size.to_string();
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = Some(true);
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;
}
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
};
let existing_hot_tier_used_size = hot_tier_manager
.validate_hot_tier_size(&stream_name, hottier.size)
.await?;
hottier.used_size = existing_hot_tier_used_size;
hottier.available_size = hottier.size;
hottier.version = Some(CURRENT_HOT_TIER_VERSION.to_string());
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = CONFIG.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = true;
storage
.put_stream_manifest(&stream_name, &stream_metadata)
.await?;

Ok((
format!("hot tier set for stream {stream_name}"),
Expand All @@ -662,22 +655,12 @@ pub async fn get_stream_hot_tier(stream_name: Path<String>) -> Result<impl Respo
}
}

if CONFIG.options.hot_tier_storage_path.is_none() {
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
}
};
let meta = hot_tier_manager.get_hot_tier(&stream_name).await?;

if let Some(hot_tier_manager) = HotTierManager::global() {
let mut hot_tier = hot_tier_manager.get_hot_tier(&stream_name).await?;
hot_tier.size = format!("{} {}", hot_tier.size, "Bytes");
hot_tier.used_size = format!("{} Bytes", hot_tier.used_size);
hot_tier.available_size = format!("{} Bytes", hot_tier.available_size);
Ok((web::Json(hot_tier), StatusCode::OK))
} else {
Err(StreamError::Custom {
msg: format!("hot tier not initialised for stream {}", stream_name),
status: (StatusCode::BAD_REQUEST),
})
}
Ok((web::Json(meta), StatusCode::OK))
}

pub async fn delete_stream_hot_tier(
Expand All @@ -699,9 +682,9 @@ pub async fn delete_stream_hot_tier(
}
}

if CONFIG.options.hot_tier_storage_path.is_none() {
let Some(hot_tier_manager) = HotTierManager::global() else {
return Err(StreamError::HotTierNotEnabled(stream_name));
}
};

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
return Err(StreamError::Custom {
Expand All @@ -710,9 +693,8 @@ pub async fn delete_stream_hot_tier(
});
}

if let Some(hot_tier_manager) = HotTierManager::global() {
hot_tier_manager.delete_hot_tier(&stream_name).await?;
}
hot_tier_manager.delete_hot_tier(&stream_name).await?;

Ok((
format!("hot tier deleted for stream {stream_name}"),
StatusCode::OK,
Expand Down Expand Up @@ -821,8 +803,6 @@ pub mod error {
"Hot tier is not enabled at the server config, cannot enable hot tier for stream {0}"
)]
HotTierNotEnabled(String),
#[error("failed to enable hottier due to err: {0}")]
InvalidHotTierConfig(serde_json::Error),
#[error("Hot tier validation failed: {0}")]
HotTierValidation(#[from] HotTierValidationError),
#[error("{0}")]
Expand Down Expand Up @@ -859,8 +839,7 @@ pub mod error {
StreamError::Network(err) => {
err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)
}
StreamError::HotTierNotEnabled(_) => StatusCode::BAD_REQUEST,
StreamError::InvalidHotTierConfig(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierNotEnabled(_) => StatusCode::FORBIDDEN,
StreamError::HotTierValidation(_) => StatusCode::BAD_REQUEST,
StreamError::HotTierError(_) => StatusCode::INTERNAL_SERVER_ERROR,
}
Expand Down
Loading

0 comments on commit a0a0ec3

Please sign in to comment.