Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions quickwit/quickwit-cli/src/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,18 @@ pub fn build_split_command() -> Command {
.required(false),
])
)
.subcommand(
Command::new("add")
.about("Adds a split file to an existing index.")
.args(&[
arg!(--index <INDEX_ID> "Target index ID")
.display_order(1)
.required(true),
arg!(--split <SPLIT_URI> "URI of the split file to add")
.display_order(2)
.required(true),
])
)
.arg_required_else_help(true)
}

Expand Down Expand Up @@ -156,11 +168,19 @@ pub struct DescribeSplitArgs {
pub verbose: bool,
}

#[derive(Debug, Eq, PartialEq)]
pub struct AddSplitArgs {
pub client_args: ClientArgs,
pub index_id: IndexId,
pub split_uri: String,
}

#[derive(Debug, PartialEq)]
pub enum SplitCliCommand {
List(ListSplitArgs),
MarkForDeletion(MarkForDeletionArgs),
Describe(DescribeSplitArgs),
Add(AddSplitArgs),
}

impl SplitCliCommand {
Expand All @@ -172,6 +192,7 @@ impl SplitCliCommand {
"describe" => Self::parse_describe_args(submatches),
"list" => Self::parse_list_args(submatches),
"mark-for-deletion" => Self::parse_mark_for_deletion_args(submatches),
"add" => Self::parse_add_args(submatches),
_ => bail!("unknown split subcommand `{subcommand}`"),
}
}
Expand Down Expand Up @@ -275,11 +296,28 @@ impl SplitCliCommand {
}))
}

fn parse_add_args(mut matches: ArgMatches) -> anyhow::Result<Self> {
let client_args = ClientArgs::parse(&mut matches)?;
let index_id = matches
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let split_uri = matches
.remove_one::<String>("split")
.expect("`split` should be a required arg.");

Ok(Self::Add(AddSplitArgs {
client_args,
index_id,
split_uri,
}))
}

pub async fn execute(self) -> anyhow::Result<()> {
match self {
Self::List(args) => list_split_cli(args).await,
Self::MarkForDeletion(args) => mark_splits_for_deletion_cli(args).await,
Self::Describe(args) => describe_split_cli(args).await,
Self::Add(args) => add_split_cli(args).await,
}
}
}
Expand Down Expand Up @@ -388,6 +426,27 @@ async fn describe_split_cli(args: DescribeSplitArgs) -> anyhow::Result<()> {
Ok(())
}

async fn add_split_cli(args: AddSplitArgs) -> anyhow::Result<()> {
debug!(args=?args, "add-split");
println!("❯ Adding split to index...");

let qw_client = args.client_args.client();

// Call the REST API to add the split
qw_client
.splits(&args.index_id)
.add(args.split_uri)
.await
.context("failed to add split")?;

println!(
"{} Split successfully added to index.",
"✔".color(GREEN_COLOR)
);

Ok(())
}

fn make_split_table(splits: &[Split], title: &str) -> Table {
let rows = splits
.iter()
Expand Down Expand Up @@ -631,4 +690,31 @@ mod tests {
SplitState::MarkedForDeletion
);
}

#[test]
fn test_parse_add_split_args() -> anyhow::Result<()> {
let app = build_cli().no_binary_name(true);
let matches = app.try_get_matches_from(vec![
"split",
"add",
"--endpoint",
"https://quickwit-cluster.io",
"--index",
"wikipedia",
"--split",
"s3://my-bucket/splits/wikipedia-split-001.split",
])?;
let command = CliCommand::parse_cli_args(matches)?;
assert!(matches!(
command,
CliCommand::Split(SplitCliCommand::Add(AddSplitArgs {
client_args,
index_id,
split_uri,
})) if client_args.cluster_endpoint.to_string() == "https://quickwit-cluster.io/"
&& index_id == "wikipedia"
&& split_uri == "s3://my-bucket/splits/wikipedia-split-001.split"
));
Ok(())
}
}
21 changes: 21 additions & 0 deletions quickwit/quickwit-rest-client/src/rest_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,27 @@ impl<'a, 'b> SplitClient<'a, 'b> {
response.check().await?;
Ok(())
}

pub async fn add(&self, split_uri: String) -> Result<(), Error> {
let path = self.splits_root_url();
let body_json = json!({ "split_uri": split_uri });
let body_vec =
serde_json::to_vec(&body_json).expect("serializing `body_json` should never fail");
let body_bytes = Bytes::from(body_vec);
let response = self
.transport
.send::<()>(
Method::POST,
&path,
None,
None,
Some(body_bytes),
self.timeout,
)
.await?;
response.check().await?;
Ok(())
}
}

/// Client for source APIs.
Expand Down
8 changes: 5 additions & 3 deletions quickwit/quickwit-serve/src/index_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ use super::source_resource::{
reset_source_checkpoint_handler, toggle_source_handler, update_source_handler,
};
use super::split_resource::{
__path_list_splits, __path_mark_splits_for_deletion, SplitsForDeletion, list_splits_handler,
mark_splits_for_deletion_handler,
__path_list_splits, __path_mark_splits_for_deletion, __path_add_split, SplitsForDeletion, AddSplitRequest, list_splits_handler,
mark_splits_for_deletion_handler, add_split_handler,
};
use crate::format::extract_format_from_qs;
use crate::rest::recover_fn;
Expand All @@ -56,13 +56,14 @@ use crate::simple_list::from_simple_list;
list_splits,
describe_index,
mark_splits_for_deletion,
add_split,
create_source,
update_source,
reset_source_checkpoint,
toggle_source,
delete_source,
),
components(schemas(ToggleSource, SplitsForDeletion, IndexStats))
components(schemas(ToggleSource, SplitsForDeletion, AddSplitRequest, IndexStats))
)]
pub struct IndexApi;

Expand Down Expand Up @@ -101,6 +102,7 @@ pub fn index_management_handlers(
.or(list_splits_handler(index_service.metastore()))
.or(describe_index_handler(index_service.metastore()))
.or(mark_splits_for_deletion_handler(index_service.metastore()))
.or(add_split_handler(index_service.metastore()))
.boxed()
// Sources handlers.
.or(reset_source_checkpoint_handler(index_service.metastore()))
Expand Down
146 changes: 142 additions & 4 deletions quickwit/quickwit-serve/src/index_api/split_resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,23 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use quickwit_config::StorageResolver;
use quickwit_metastore::{
IndexMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt,
MetastoreServiceStreamSplitsExt, Split, SplitState,
IndexMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, StageSplitsRequestExt,
MetastoreServiceStreamSplitsExt, Split, SplitState, SplitMetadata, SplitMaturity,
};
use quickwit_proto::metastore::{
IndexMetadataRequest, ListSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult,
MetastoreService, MetastoreServiceClient,
MetastoreService, MetastoreServiceClient, StageSplitsRequest,
};
use quickwit_proto::types::{IndexId, IndexUid};
use quickwit_storage::OwnedBytes;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use tracing::info;
use warp::{Filter, Rejection};

use super::rest_handler::json_body;
use crate::rest::json_body;
use crate::format::extract_format_from_qs;
use crate::rest_api_response::into_rest_api_response;
use crate::simple_list::{from_simple_list, to_simple_list};
Expand Down Expand Up @@ -155,6 +158,12 @@ pub struct SplitsForDeletion {
pub split_ids: Vec<String>,
}

#[derive(Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct AddSplitRequest {
pub split_uri: String,
}

#[utoipa::path(
put,
tag = "Splits",
Expand Down Expand Up @@ -193,6 +202,135 @@ pub async fn mark_splits_for_deletion(
Ok(())
}

#[utoipa::path(
post,
tag = "Splits",
path = "/indexes/{index_id}/splits",
request_body = AddSplitRequest,
responses(
(status = 200, description = "Successfully added split to index.")
),
params(
("index_id" = String, Path, description = "The index ID to add the split to."),
)
)]
/// Adds a split to an index.
pub async fn add_split(
index_id: IndexId,
add_split_request: AddSplitRequest,
metastore: MetastoreServiceClient,
) -> MetastoreResult<()> {
let index_metadata_request = IndexMetadataRequest::for_index_id(index_id.to_string());
let index_metadata = metastore
.index_metadata(index_metadata_request)
.await?
.deserialize_index_metadata()?;
let index_uid = index_metadata.index_uid;

info!(index_id = %index_id, split_uri = %add_split_request.split_uri, "add-split");

// Parse the split URI to get storage and path
let split_uri = &add_split_request.split_uri;
let split_path = std::path::Path::new(split_uri);

// Extract split ID from the URI (assuming it's the filename without extension)
let split_filename = split_path
.file_stem()
.and_then(|name| name.to_str())
.ok_or_else(|| quickwit_proto::metastore::MetastoreError::Internal {
message: format!("Invalid split URI: unable to extract split ID from {}", split_uri),
})?;
let split_id = quickwit_proto::types::SplitId::from(split_filename);

// Resolve the storage for the index
let storage_resolver = StorageResolver::from_config(&index_metadata.storage_config);
let storage = storage_resolver
.resolve(&index_metadata.index_uri)
.await
.map_err(|e| quickwit_proto::metastore::MetastoreError::Internal {
message: format!("Failed to resolve storage for index {}: {}", index_id, e),
})?;

// Read the split file
let split_data = storage
.get_all(split_path)
.await
.map_err(|e| quickwit_proto::metastore::MetastoreError::Internal {
message: format!("Failed to read split file {}: {}", split_uri, e),
})?;

// Extract split metadata from the split file
let split_metadata = extract_split_metadata_from_file(split_data, &split_id, &index_uid)
.await
.map_err(|e| quickwit_proto::metastore::MetastoreError::Internal {
message: format!("Failed to extract split metadata from {}: {}", split_uri, e),
})?;

// Create and send StageSplitsRequest
let stage_splits_request = quickwit_metastore::StageSplitsRequestExt::try_from_split_metadata(
&index_uid,
&split_metadata,
)
.map_err(|e| quickwit_proto::metastore::MetastoreError::Internal {
message: format!("Failed to create stage splits request: {}", e),
})?;

metastore
.stage_splits(stage_splits_request)
.await?;

info!(index_id = %index_id, split_id = %split_id, "successfully added split to index");

Ok(())
}

/// Extracts split metadata from a split file.
async fn extract_split_metadata_from_file(
split_data: OwnedBytes,
split_id: &quickwit_proto::types::SplitId,
index_uid: &quickwit_proto::types::IndexUid,
) -> anyhow::Result<quickwit_metastore::SplitMetadata> {
// For now, create minimal metadata from the split information
// This is a simplified approach - we'll create basic metadata
let file_size = split_data.len() as u64;
let create_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64;

let split_metadata = quickwit_metastore::SplitMetadata {
split_id: split_id.clone(),
index_uid: index_uid.clone(),
partition_id: 0, // Default partition
source_id: "manual-add".to_string(),
node_id: "manual-add".to_string(),
num_docs: 0, // Unknown - will be updated if possible
uncompressed_docs_size_in_bytes: 0, // Unknown
time_range: None,
create_timestamp,
footer_offsets: 0..file_size,
tags: std::collections::BTreeSet::new(),
delete_opstamp: 0,
num_merge_ops: 0,
maturity: quickwit_metastore::SplitMaturity::Stable,
};

Ok(split_metadata)
}

pub fn add_split_handler(
metastore: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
warp::path!("indexes" / String / "splits")
.and(warp::post())
.and(json_body())
.and(with_arg(metastore))
.then(add_split)
.and(extract_format_from_qs())
.map(into_rest_api_response)
.boxed()
}

pub fn mark_splits_for_deletion_handler(
metastore: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
Expand Down