diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 58b1862e243..3230be0f348 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -3085,12 +3085,12 @@ checksum = "28dd6caf6059519a65843af8fe2a3ae298b14b80179855aeb4adc2c1934ee619" [[package]] name = "fs4" -version = "0.8.4" +version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f7e180ac76c23b45e767bd7ae9579bc0bb458618c4bc71835926e098e61d15f8" +checksum = "8640e34b88f7652208ce9e88b1a37a2ae95227d84abec377ccd3c5cfeb141ed4" dependencies = [ - "rustix 0.38.44", - "windows-sys 0.52.0", + "rustix 1.0.7", + "windows-sys 0.59.0", ] [[package]] @@ -5727,8 +5727,8 @@ checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" [[package]] name = "ownedbytes" -version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.9.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "stable_deref_trait", ] @@ -9738,8 +9738,8 @@ checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" [[package]] name = "tantivy" -version = "0.23.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.25.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "aho-corasick", "arc-swap", @@ -9793,16 +9793,16 @@ dependencies = [ [[package]] name = "tantivy-bitpacker" -version = "0.6.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.8.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "bitpacking", ] [[package]] name = "tantivy-columnar" -version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.5.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "downcast-rs", "fastdivide", @@ -9816,8 +9816,8 @@ dependencies = [ [[package]] name = "tantivy-common" -version = "0.7.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.9.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "async-trait", "byteorder", @@ -9839,29 +9839,30 @@ dependencies = [ [[package]] name = "tantivy-query-grammar" -version = "0.22.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.24.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "nom", + "serde", + "serde_json", ] [[package]] name = "tantivy-sstable" -version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.5.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "futures-util", "itertools 0.14.0", "tantivy-bitpacker", "tantivy-common", "tantivy-fst", - "zstd 0.13.3", ] [[package]] name = "tantivy-stacker" -version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.5.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "murmurhash32", "rand_distr", @@ -9870,8 +9871,8 @@ dependencies = [ [[package]] name = "tantivy-tokenizer-api" -version = "0.3.0" -source = "git+https://github.com/quickwit-oss/tantivy/?rev=80f5f1e#80f5f1ecd4d5d30069b44e5acd3801f610d36056" +version = "0.5.0" +source = "git+https://github.com/quickwit-oss/tantivy/?rev=b621e4da#b621e4dabeef8bfbf4687de0730a47ae0905d40a" dependencies = [ "serde", ] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 86d17907be9..06c40fff4dc 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -347,7 +347,7 @@ quickwit-serve = { path = "quickwit-serve" } quickwit-storage = { path = "quickwit-storage" } quickwit-telemetry = { path = "quickwit-telemetry" } -tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "80f5f1e", default-features = false, features = [ +tantivy = { git = "https://github.com/quickwit-oss/tantivy/", rev = "b621e4da", default-features = false, features = [ "lz4-compression", "mmap", "quickwit", diff --git a/quickwit/quickwit-indexing/src/actors/packager.rs b/quickwit/quickwit-indexing/src/actors/packager.rs index 340979ba0e9..9e6ffec12f5 100644 --- a/quickwit/quickwit-indexing/src/actors/packager.rs +++ b/quickwit/quickwit-indexing/src/actors/packager.rs @@ -32,10 +32,14 @@ use quickwit_proto::search::{ }; use tantivy::index::FieldMetadata; use tantivy::schema::{FieldType, Type}; -use tantivy::{InvertedIndexReader, ReloadPolicy, SegmentMeta}; +use tantivy::{ByteCount, Index, InvertedIndexReader, ReloadPolicy, SegmentMeta}; use tokio::runtime::Handle; use tracing::{debug, info, instrument, warn}; +const LIMIT_PACKAGED_FIELDS: usize = 1_000; + +const QW_MAX_PACKAGED_FIELDS_CAPS_ENV_KEY: &str = "QW_MAX_PACKAGED_FIELDS_CAPS"; + /// Maximum distinct values allowed for a tag field within a split. const MAX_VALUES_PER_TAG_FIELD: usize = if cfg!(any(test, feature = "testsuite")) { 6 @@ -268,6 +272,41 @@ fn try_extract_terms( Ok(terms) } +fn total_num_bytes(field_metadata: &FieldMetadata) -> u64 { + let num_bytes = + |bytes_opt: &Option| bytes_opt.map(|b| b.get_bytes()).unwrap_or(0u64); + num_bytes(&field_metadata.term_dictionary_size) + + num_bytes(&field_metadata.postings_size) + + num_bytes(&field_metadata.positions_size) + + num_bytes(&field_metadata.fast_size) +} + +fn build_list_fields(index: &Index) -> anyhow::Result { + let fields_metadata: Vec = index.fields_metadata()?; + let num_fields = fields_metadata.len(); + + let max_packaged_field_caps = + quickwit_common::get_from_env(QW_MAX_PACKAGED_FIELDS_CAPS_ENV_KEY, 1_000); + let fields_within_limit: Vec = if num_fields > max_packaged_field_caps { + info!( + "truncate list field information num_fields={num_fields} limit={LIMIT_PACKAGED_FIELDS}" + ); + fields_metadata + .into_iter() + .k_largest_by_key(LIMIT_PACKAGED_FIELDS, total_num_bytes) + .collect() + } else { + fields_metadata + }; + + let fields = fields_within_limit + .iter() + .map(field_metadata_to_list_field_serialized) + .collect::>(); + + Ok(ListFields { fields }) +} + fn create_packaged_split( segment_metas: &[SegmentMeta], split: IndexedSplit, @@ -286,8 +325,6 @@ fn create_packaged_split( .reload_policy(ReloadPolicy::Manual) .try_into()?; - let fields_metadata = split.index.fields_metadata()?; - let mut tags = BTreeSet::default(); for named_field in tag_fields { let inverted_indexes = index_reader @@ -314,7 +351,8 @@ fn create_packaged_split( build_hotcache(split.split_scratch_directory.path(), &mut hotcache_bytes)?; ctx.record_progress(); - let serialized_split_fields = serialize_field_metadata(&fields_metadata); + let list_fields: ListFields = build_list_fields(&split.index)?; + let serialized_split_fields = serialize_split_fields(&list_fields); let packaged_split = PackagedSplit { serialized_split_fields, @@ -327,18 +365,6 @@ fn create_packaged_split( Ok(packaged_split) } -/// Serializes the Split fields. -/// -/// `fields_metadata` has to be sorted. -fn serialize_field_metadata(fields_metadata: &[FieldMetadata]) -> Vec { - let fields = fields_metadata - .iter() - .map(field_metadata_to_list_field_serialized) - .collect::>(); - - serialize_split_fields(ListFields { fields }) -} - fn tantivy_type_to_list_field_type(typ: Type) -> ListFieldType { match typ { Type::Str => ListFieldType::Str, @@ -360,8 +386,8 @@ fn field_metadata_to_list_field_serialized( ListFieldsEntryResponse { field_name: field_metadata.field_name.to_string(), field_type: tantivy_type_to_list_field_type(field_metadata.typ) as i32, - searchable: field_metadata.indexed, - aggregatable: field_metadata.fast, + searchable: field_metadata.is_indexed(), + aggregatable: field_metadata.is_fast(), index_ids: Vec::new(), non_searchable_index_ids: Vec::new(), non_aggregatable_index_ids: Vec::new(), @@ -385,7 +411,7 @@ mod tests { use quickwit_proto::search::{ListFieldsEntryResponse, deserialize_split_fields}; use quickwit_proto::types::{DocMappingUid, IndexUid, NodeId}; use tantivy::directory::MmapDirectory; - use tantivy::schema::{FAST, NumericOptions, STRING, Schema, TEXT, Type}; + use tantivy::schema::{FAST, NumericOptions, STRING, Schema, TEXT}; use tantivy::{DateTime, IndexBuilder, IndexSettings, doc}; use tracing::Span; @@ -393,37 +419,39 @@ mod tests { use crate::models::{PublishLock, SplitAttrs}; #[test] - fn serialize_field_metadata_test() { - let fields_metadata = vec![ - FieldMetadata { - field_name: "test".to_string(), - typ: Type::Str, - indexed: true, - stored: true, - fast: true, - }, - FieldMetadata { - field_name: "test2".to_string(), - typ: Type::Str, - indexed: true, - stored: false, - fast: false, - }, - FieldMetadata { - field_name: "test3".to_string(), - typ: Type::U64, - indexed: true, - stored: false, - fast: true, - }, - ]; + fn test_serialize_split_fields() { + let list_fields = ListFields { + fields: vec![ + ListFieldsEntryResponse { + field_name: "test".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + ..Default::default() + }, + ListFieldsEntryResponse { + field_name: "test2".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: false, + ..Default::default() + }, + ListFieldsEntryResponse { + field_name: "test3".to_string(), + field_type: ListFieldType::Str as i32, + searchable: true, + aggregatable: true, + ..Default::default() + }, + ], + }; - let out = serialize_field_metadata(&fields_metadata); + let out = serialize_split_fields(&list_fields); let deserialized: Vec = deserialize_split_fields(&mut &out[..]).unwrap().fields; - assert_eq!(fields_metadata.len(), deserialized.len()); + assert_eq!(list_fields.fields.len(), deserialized.len()); assert_eq!(deserialized[0].field_name, "test"); assert_eq!(deserialized[0].field_type, ListFieldType::Str as i32); assert!(deserialized[0].searchable); diff --git a/quickwit/quickwit-proto/src/search/mod.rs b/quickwit/quickwit-proto/src/search/mod.rs index 307de262a70..f28292a4d53 100644 --- a/quickwit/quickwit-proto/src/search/mod.rs +++ b/quickwit/quickwit-proto/src/search/mod.rs @@ -227,7 +227,7 @@ impl PartialHit { /// Serializes the Split fields. /// /// `fields_metadata` has to be sorted. -pub fn serialize_split_fields(list_fields: ListFields) -> Vec { +pub fn serialize_split_fields(list_fields: &ListFields) -> Vec { let payload = list_fields.encode_to_vec(); let compression_level = 3; let payload_compressed = zstd::stream::encode_all(&mut &payload[..], compression_level) diff --git a/quickwit/quickwit-search/src/list_fields.rs b/quickwit/quickwit-search/src/list_fields.rs index aa8d473e8fb..1f3899b6851 100644 --- a/quickwit/quickwit-search/src/list_fields.rs +++ b/quickwit/quickwit-search/src/list_fields.rs @@ -70,15 +70,15 @@ pub async fn get_fields_from_split( Ordering::Equal => left.field_type.cmp(&right.field_type), other => other, }); + let list_fields = ListFields { + fields: list_fields, + }; // Put result into cache - searcher_context.list_fields_cache.put( - split_and_footer_offsets.clone(), - ListFields { - fields: list_fields.clone(), - }, - ); + searcher_context + .list_fields_cache + .put(split_and_footer_offsets.clone(), &list_fields); - Ok(Box::new(list_fields.into_iter())) + Ok(Box::new(list_fields.fields.into_iter())) } /// `current_group` needs to contain at least one element. diff --git a/quickwit/quickwit-search/src/list_fields_cache.rs b/quickwit/quickwit-search/src/list_fields_cache.rs index 681ce7a2e77..e6a6461ec51 100644 --- a/quickwit/quickwit-search/src/list_fields_cache.rs +++ b/quickwit/quickwit-search/src/list_fields_cache.rs @@ -41,9 +41,8 @@ impl ListFieldsCache { deserialize_split_fields(encoded_result).ok() } - pub fn put(&self, split_info: SplitIdAndFooterOffsets, list_fields: ListFields) { + pub fn put(&self, split_info: SplitIdAndFooterOffsets, list_fields: &ListFields) { let key = CacheKey::from_split_meta(split_info); - let encoded_result = serialize_split_fields(list_fields); self.content.put(key, OwnedBytes::new(encoded_result)); } @@ -110,7 +109,7 @@ mod tests { fields: vec![result.clone()], }; - cache.put(split_1.clone(), list_fields.clone()); + cache.put(split_1.clone(), &list_fields); assert_eq!(cache.get(split_1.clone()).unwrap(), list_fields); assert!(cache.get(split_2).is_none()); }