diff --git a/quickwit/quickwit-cli/src/split.rs b/quickwit/quickwit-cli/src/split.rs index ebdebbb7091..1015b1cfcea 100644 --- a/quickwit/quickwit-cli/src/split.rs +++ b/quickwit/quickwit-cli/src/split.rs @@ -100,6 +100,18 @@ pub fn build_split_command() -> Command { .required(false), ]) ) + .subcommand( + Command::new("add") + .about("Adds a split file to an existing index.") + .args(&[ + arg!(--index "Target index ID") + .display_order(1) + .required(true), + arg!(--split "URI of the split file to add") + .display_order(2) + .required(true), + ]) + ) .arg_required_else_help(true) } @@ -156,11 +168,19 @@ pub struct DescribeSplitArgs { pub verbose: bool, } +#[derive(Debug, Eq, PartialEq)] +pub struct AddSplitArgs { + pub client_args: ClientArgs, + pub index_id: IndexId, + pub split_uri: String, +} + #[derive(Debug, PartialEq)] pub enum SplitCliCommand { List(ListSplitArgs), MarkForDeletion(MarkForDeletionArgs), Describe(DescribeSplitArgs), + Add(AddSplitArgs), } impl SplitCliCommand { @@ -172,6 +192,7 @@ impl SplitCliCommand { "describe" => Self::parse_describe_args(submatches), "list" => Self::parse_list_args(submatches), "mark-for-deletion" => Self::parse_mark_for_deletion_args(submatches), + "add" => Self::parse_add_args(submatches), _ => bail!("unknown split subcommand `{subcommand}`"), } } @@ -275,11 +296,28 @@ impl SplitCliCommand { })) } + fn parse_add_args(mut matches: ArgMatches) -> anyhow::Result { + let client_args = ClientArgs::parse(&mut matches)?; + let index_id = matches + .remove_one::("index") + .expect("`index` should be a required arg."); + let split_uri = matches + .remove_one::("split") + .expect("`split` should be a required arg."); + + Ok(Self::Add(AddSplitArgs { + client_args, + index_id, + split_uri, + })) + } + pub async fn execute(self) -> anyhow::Result<()> { match self { Self::List(args) => list_split_cli(args).await, Self::MarkForDeletion(args) => mark_splits_for_deletion_cli(args).await, Self::Describe(args) => describe_split_cli(args).await, + Self::Add(args) => add_split_cli(args).await, } } } @@ -388,6 +426,27 @@ async fn describe_split_cli(args: DescribeSplitArgs) -> anyhow::Result<()> { Ok(()) } +async fn add_split_cli(args: AddSplitArgs) -> anyhow::Result<()> { + debug!(args=?args, "add-split"); + println!("❯ Adding split to index..."); + + let qw_client = args.client_args.client(); + + // Call the REST API to add the split + qw_client + .splits(&args.index_id) + .add(args.split_uri) + .await + .context("failed to add split")?; + + println!( + "{} Split successfully added to index.", + "✔".color(GREEN_COLOR) + ); + + Ok(()) +} + fn make_split_table(splits: &[Split], title: &str) -> Table { let rows = splits .iter() @@ -631,4 +690,31 @@ mod tests { SplitState::MarkedForDeletion ); } + + #[test] + fn test_parse_add_split_args() -> anyhow::Result<()> { + let app = build_cli().no_binary_name(true); + let matches = app.try_get_matches_from(vec![ + "split", + "add", + "--endpoint", + "https://quickwit-cluster.io", + "--index", + "wikipedia", + "--split", + "s3://my-bucket/splits/wikipedia-split-001.split", + ])?; + let command = CliCommand::parse_cli_args(matches)?; + assert!(matches!( + command, + CliCommand::Split(SplitCliCommand::Add(AddSplitArgs { + client_args, + index_id, + split_uri, + })) if client_args.cluster_endpoint.to_string() == "https://quickwit-cluster.io/" + && index_id == "wikipedia" + && split_uri == "s3://my-bucket/splits/wikipedia-split-001.split" + )); + Ok(()) + } } diff --git a/quickwit/quickwit-rest-client/src/rest_client.rs b/quickwit/quickwit-rest-client/src/rest_client.rs index 1fb2b5c9812..906919b4ebb 100644 --- a/quickwit/quickwit-rest-client/src/rest_client.rs +++ b/quickwit/quickwit-rest-client/src/rest_client.rs @@ -550,6 +550,27 @@ impl<'a, 'b> SplitClient<'a, 'b> { response.check().await?; Ok(()) } + + pub async fn add(&self, split_uri: String) -> Result<(), Error> { + let path = self.splits_root_url(); + let body_json = json!({ "split_uri": split_uri }); + let body_vec = + serde_json::to_vec(&body_json).expect("serializing `body_json` should never fail"); + let body_bytes = Bytes::from(body_vec); + let response = self + .transport + .send::<()>( + Method::POST, + &path, + None, + None, + Some(body_bytes), + self.timeout, + ) + .await?; + response.check().await?; + Ok(()) + } } /// Client for source APIs. diff --git a/quickwit/quickwit-serve/src/index_api/rest_handler.rs b/quickwit/quickwit-serve/src/index_api/rest_handler.rs index bc4c8d9b105..ba7429f6f75 100644 --- a/quickwit/quickwit-serve/src/index_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/index_api/rest_handler.rs @@ -37,8 +37,8 @@ use super::source_resource::{ reset_source_checkpoint_handler, toggle_source_handler, update_source_handler, }; use super::split_resource::{ - __path_list_splits, __path_mark_splits_for_deletion, SplitsForDeletion, list_splits_handler, - mark_splits_for_deletion_handler, + __path_list_splits, __path_mark_splits_for_deletion, __path_add_split, SplitsForDeletion, AddSplitRequest, list_splits_handler, + mark_splits_for_deletion_handler, add_split_handler, }; use crate::format::extract_format_from_qs; use crate::rest::recover_fn; @@ -56,13 +56,14 @@ use crate::simple_list::from_simple_list; list_splits, describe_index, mark_splits_for_deletion, + add_split, create_source, update_source, reset_source_checkpoint, toggle_source, delete_source, ), - components(schemas(ToggleSource, SplitsForDeletion, IndexStats)) + components(schemas(ToggleSource, SplitsForDeletion, AddSplitRequest, IndexStats)) )] pub struct IndexApi; @@ -101,6 +102,7 @@ pub fn index_management_handlers( .or(list_splits_handler(index_service.metastore())) .or(describe_index_handler(index_service.metastore())) .or(mark_splits_for_deletion_handler(index_service.metastore())) + .or(add_split_handler(index_service.metastore())) .boxed() // Sources handlers. .or(reset_source_checkpoint_handler(index_service.metastore())) diff --git a/quickwit/quickwit-serve/src/index_api/split_resource.rs b/quickwit/quickwit-serve/src/index_api/split_resource.rs index a062186e551..cbf12b1f84f 100644 --- a/quickwit/quickwit-serve/src/index_api/split_resource.rs +++ b/quickwit/quickwit-serve/src/index_api/split_resource.rs @@ -12,20 +12,23 @@ // See the License for the specific language governing permissions and // limitations under the License. +use quickwit_config::StorageResolver; use quickwit_metastore::{ - IndexMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, - MetastoreServiceStreamSplitsExt, Split, SplitState, + IndexMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, StageSplitsRequestExt, + MetastoreServiceStreamSplitsExt, Split, SplitState, SplitMetadata, SplitMaturity, }; use quickwit_proto::metastore::{ IndexMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult, - MetastoreService, MetastoreServiceClient, + MetastoreService, MetastoreServiceClient, StageSplitsRequest, }; use quickwit_proto::types::{IndexId, IndexUid}; +use quickwit_storage::OwnedBytes; use serde::{Deserialize, Serialize}; +use std::collections::BTreeSet; use tracing::info; use warp::{Filter, Rejection}; -use super::rest_handler::json_body; +use crate::rest::json_body; use crate::format::extract_format_from_qs; use crate::rest_api_response::into_rest_api_response; use crate::simple_list::{from_simple_list, to_simple_list}; @@ -155,6 +158,12 @@ pub struct SplitsForDeletion { pub split_ids: Vec, } +#[derive(Deserialize, utoipa::ToSchema)] +#[serde(deny_unknown_fields)] +pub struct AddSplitRequest { + pub split_uri: String, +} + #[utoipa::path( put, tag = "Splits", @@ -193,6 +202,135 @@ pub async fn mark_splits_for_deletion( Ok(()) } +#[utoipa::path( + post, + tag = "Splits", + path = "/indexes/{index_id}/splits", + request_body = AddSplitRequest, + responses( + (status = 200, description = "Successfully added split to index.") + ), + params( + ("index_id" = String, Path, description = "The index ID to add the split to."), + ) +)] +/// Adds a split to an index. +pub async fn add_split( + index_id: IndexId, + add_split_request: AddSplitRequest, + metastore: MetastoreServiceClient, +) -> MetastoreResult<()> { + let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string()); + let index_metadata = metastore + .index_metadata(index_metadata_request) + .await? + .deserialize_index_metadata()?; + let index_uid = index_metadata.index_uid; + + info!(index_id = %index_id, split_uri = %add_split_request.split_uri, "add-split"); + + // Parse the split URI to get storage and path + let split_uri = &add_split_request.split_uri; + let split_path = std::path::Path::new(split_uri); + + // Extract split ID from the URI (assuming it's the filename without extension) + let split_filename = split_path + .file_stem() + .and_then(|name| name.to_str()) + .ok_or_else(|| quickwit_proto::metastore::MetastoreError::Internal { + message: format!("Invalid split URI: unable to extract split ID from {}", split_uri), + })?; + let split_id = quickwit_proto::types::SplitId::from(split_filename); + + // Resolve the storage for the index + let storage_resolver = StorageResolver::from_config(&index_metadata.storage_config); + let storage = storage_resolver + .resolve(&index_metadata.index_uri) + .await + .map_err(|e| quickwit_proto::metastore::MetastoreError::Internal { + message: format!("Failed to resolve storage for index {}: {}", index_id, e), + })?; + + // Read the split file + let split_data = storage + .get_all(split_path) + .await + .map_err(|e| quickwit_proto::metastore::MetastoreError::Internal { + message: format!("Failed to read split file {}: {}", split_uri, e), + })?; + + // Extract split metadata from the split file + let split_metadata = extract_split_metadata_from_file(split_data, &split_id, &index_uid) + .await + .map_err(|e| quickwit_proto::metastore::MetastoreError::Internal { + message: format!("Failed to extract split metadata from {}: {}", split_uri, e), + })?; + + // Create and send StageSplitsRequest + let stage_splits_request = quickwit_metastore::StageSplitsRequestExt::try_from_split_metadata( + &index_uid, + &split_metadata, + ) + .map_err(|e| quickwit_proto::metastore::MetastoreError::Internal { + message: format!("Failed to create stage splits request: {}", e), + })?; + + metastore + .stage_splits(stage_splits_request) + .await?; + + info!(index_id = %index_id, split_id = %split_id, "successfully added split to index"); + + Ok(()) +} + +/// Extracts split metadata from a split file. +async fn extract_split_metadata_from_file( + split_data: OwnedBytes, + split_id: &quickwit_proto::types::SplitId, + index_uid: &quickwit_proto::types::IndexUid, +) -> anyhow::Result { + // For now, create minimal metadata from the split information + // This is a simplified approach - we'll create basic metadata + let file_size = split_data.len() as u64; + let create_timestamp = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64; + + let split_metadata = quickwit_metastore::SplitMetadata { + split_id: split_id.clone(), + index_uid: index_uid.clone(), + partition_id: 0, // Default partition + source_id: "manual-add".to_string(), + node_id: "manual-add".to_string(), + num_docs: 0, // Unknown - will be updated if possible + uncompressed_docs_size_in_bytes: 0, // Unknown + time_range: None, + create_timestamp, + footer_offsets: 0..file_size, + tags: std::collections::BTreeSet::new(), + delete_opstamp: 0, + num_merge_ops: 0, + maturity: quickwit_metastore::SplitMaturity::Stable, + }; + + Ok(split_metadata) +} + +pub fn add_split_handler( + metastore: MetastoreServiceClient, +) -> impl Filter + Clone { + warp::path!("indexes" / String / "splits") + .and(warp::post()) + .and(json_body()) + .and(with_arg(metastore)) + .then(add_split) + .and(extract_format_from_qs()) + .map(into_rest_api_response) + .boxed() +} + pub fn mark_splits_for_deletion_handler( metastore: MetastoreServiceClient, ) -> impl Filter + Clone {