From 992c10ed9f9cbd217f926b8c8c0cbdd23e1a0991 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 2 Jul 2024 08:00:32 +0000 Subject: [PATCH 1/6] feat(fulltext_index): introduce creator Signed-off-by: Zhenchi --- Cargo.lock | 326 ++++++++++++++++++ src/index/Cargo.toml | 5 + src/index/src/fulltext_index.rs | 37 ++ src/index/src/fulltext_index/create.rs | 33 ++ .../src/fulltext_index/create/tantivy.rs | 316 +++++++++++++++++ src/index/src/fulltext_index/error.rs | 68 ++++ src/index/src/lib.rs | 1 + .../puffin_manager/stager/bounded_stager.rs | 4 +- src/puffin/src/puffin_manager/tests.rs | 28 +- 9 files changed, 802 insertions(+), 16 deletions(-) create mode 100644 src/index/src/fulltext_index.rs create mode 100644 src/index/src/fulltext_index/create.rs create mode 100644 src/index/src/fulltext_index/create/tantivy.rs create mode 100644 src/index/src/fulltext_index/error.rs diff --git a/Cargo.lock b/Cargo.lock index d123ebfe8a8c..6dd85f06ec8a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1014,6 +1014,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" +[[package]] +name = "bitpacking" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c1d3e2bfd8d06048a179f7b17afc3188effa10385e7b00dc65af6aae732ea92" +dependencies = [ + "crunchy", +] + [[package]] name = "bitvec" version = "1.0.1" @@ -1375,6 +1384,21 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "060303ef31ef4a522737e1b1ab68c67916f2a787bb2f4f54f383279adba962b5" +[[package]] +name = "cedarwood" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d910bedd62c24733263d0bed247460853c9d22e8956bd4cd964302095e04e90" +dependencies = [ + "smallvec", +] + +[[package]] +name = "census" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4c707c6a209cbe82d10abd08e1ea8995e9ea937d2550646e02798948992be0" + [[package]] name = "cesu8" version = "1.1.0" @@ -3457,6 +3481,15 @@ dependencies = [ "derive_builder_macro 0.12.0", ] +[[package]] +name = "derive_builder" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0350b5cb0331628a5916d6c5c0b72e97393b8b6b03b47a9284f4e7f5a405ffd7" +dependencies = [ + "derive_builder_macro 0.20.0", +] + [[package]] name = "derive_builder_core" version = "0.11.2" @@ -3481,6 +3514,18 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_builder_core" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d48cda787f839151732d396ac69e3473923d54312c070ee21e9effcaa8ca0b1d" +dependencies = [ + "darling 0.20.9", + "proc-macro2", + "quote", + "syn 2.0.66", +] + [[package]] name = "derive_builder_macro" version = "0.11.2" @@ -3501,6 +3546,16 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "derive_builder_macro" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206868b8242f27cecce124c19fd88157fbd0dd334df2587f36417bafbc85097b" +dependencies = [ + "derive_builder_core 0.20.0", + "syn 2.0.66", +] + [[package]] name = "diff" version = "0.1.13" @@ -3643,6 +3698,12 @@ version = "0.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1435fa1053d8b2fbbe9be7e97eca7f33d37b28409959813daefc1446a14247f1" +[[package]] +name = "downcast-rs" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75b325c5dbd37f80359721ad39aca5a29fb04c89279657cffdda8736d0c0b9d2" + [[package]] name = "duration-str" version = "0.11.2" @@ -3830,6 +3891,12 @@ version = "0.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" +[[package]] +name = "fastdivide" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59668941c55e5c186b8b58c391629af56774ec768f73c08bbcd56f09348eb00b" + [[package]] name = "fastrand" version = "2.1.0" @@ -4164,6 +4231,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "fs4" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7e180ac76c23b45e767bd7ae9579bc0bb458618c4bc71835926e098e61d15f8" +dependencies = [ + "rustix", + "windows-sys 0.52.0", +] + [[package]] name = "fsevent-sys" version = "4.1.0" @@ -4304,6 +4381,15 @@ dependencies = [ "slab", ] +[[package]] +name = "fxhash" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" +dependencies = [ + "byteorder", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -4608,6 +4694,12 @@ dependencies = [ "utf8-width", ] +[[package]] +name = "htmlescape" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9025058dae765dee5070ec375f591e2ba14638c63feff74f13805a72e523163" + [[package]] name = "http" version = "0.2.12" @@ -5146,6 +5238,7 @@ dependencies = [ "common-error", "common-macro", "common-telemetry", + "common-test-util", "fst", "futures", "greptime-proto", @@ -5155,7 +5248,10 @@ dependencies = [ "rand", "regex", "regex-automata 0.4.7", + "serde", "snafu 0.8.3", + "tantivy", + "tantivy-jieba", "tempfile", "tokio", "tokio-util", @@ -5357,6 +5453,21 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" +[[package]] +name = "jieba-rs" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1e2b0210dc78b49337af9e49d7ae41a39dceac6e5985613f1cf7763e2f76a25" +dependencies = [ + "cedarwood", + "derive_builder 0.20.0", + "fxhash", + "lazy_static", + "phf", + "phf_codegen", + "regex", +] + [[package]] name = "jni" version = "0.21.1" @@ -5677,6 +5788,12 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830d08ce1d1d941e6b30645f1a0eb5643013d835ce3779a5fc208261dbe10f55" +[[package]] +name = "levenshtein_automata" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c2cdeb66e45e9f36bfad5bbdb4d2384e70936afbee843c6f6543f0c551ebb25" + [[package]] name = "lexical-core" version = "0.8.5" @@ -6098,6 +6215,16 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "490cc448043f947bae3cbee9c203358d62dbee0db12107a74be5c30ccfd09771" +[[package]] +name = "measure_time" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbefd235b0aadd181626f281e1d684e116972988c14c264e42069d5e8a5775cc" +dependencies = [ + "instant", + "log", +] + [[package]] name = "memchr" version = "2.7.4" @@ -6517,6 +6644,12 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97af489e1e21b68de4c390ecca6703318bc1aa16e9733bcb62c089b73c6fbb1b" +[[package]] +name = "murmurhash32" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2195bf6aa996a481483b29d62a7663eed3fe39600c460e323f8ff41e90bdd89b" + [[package]] name = "mysql-common-derive" version = "0.30.2" @@ -7043,6 +7176,12 @@ version = "1.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" +[[package]] +name = "oneshot" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e296cf87e61c9cfc1a61c3c63a0f7f286ed4554e0e22be84e8a38e1d264a2a29" + [[package]] name = "oorandom" version = "11.1.3" @@ -7399,6 +7538,15 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "ownedbytes" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3a059efb063b8f425b948e042e6b9bd85edfe60e913630ed727b23e2dfcc558" +dependencies = [ + "stable_deref_trait", +] + [[package]] name = "packedvec" version = "1.2.4" @@ -9349,6 +9497,16 @@ dependencies = [ "tree-sitter-cli", ] +[[package]] +name = "rust-stemmers" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e46a2036019fdb888131db7a4c847a1063a7493f971ed94ea82c67eada63ca54" +dependencies = [ + "serde", + "serde_derive", +] + [[package]] name = "rust_decimal" version = "1.35.0" @@ -10508,6 +10666,15 @@ version = "0.3.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "38b58827f4464d87d377d175e90bf58eb00fd8716ff0a62f80356b5e61555d0d" +[[package]] +name = "sketches-ddsketch" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85636c14b73d81f541e525f585c0a2109e6744e1565b5c1668e31c70c10ed65c" +dependencies = [ + "serde", +] + [[package]] name = "slab" version = "0.4.9" @@ -11353,6 +11520,159 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" +[[package]] +name = "tantivy" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8d0582f186c0a6d55655d24543f15e43607299425c5ad8352c242b914b31856" +dependencies = [ + "aho-corasick", + "arc-swap", + "base64 0.22.1", + "bitpacking", + "byteorder", + "census", + "crc32fast", + "crossbeam-channel", + "downcast-rs", + "fastdivide", + "fnv", + "fs4", + "htmlescape", + "itertools 0.12.1", + "levenshtein_automata", + "log", + "lru", + "lz4_flex 0.11.3", + "measure_time", + "memmap2 0.9.4", + "num_cpus", + "once_cell", + "oneshot", + "rayon", + "regex", + "rust-stemmers", + "rustc-hash", + "serde", + "serde_json", + "sketches-ddsketch", + "smallvec", + "tantivy-bitpacker", + "tantivy-columnar", + "tantivy-common", + "tantivy-fst", + "tantivy-query-grammar", + "tantivy-stacker", + "tantivy-tokenizer-api", + "tempfile", + "thiserror", + "time", + "uuid", + "winapi", + "zstd 0.13.1", +] + +[[package]] +name = "tantivy-bitpacker" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "284899c2325d6832203ac6ff5891b297fc5239c3dc754c5bc1977855b23c10df" +dependencies = [ + "bitpacking", +] + +[[package]] +name = "tantivy-columnar" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12722224ffbe346c7fec3275c699e508fd0d4710e629e933d5736ec524a1f44e" +dependencies = [ + "downcast-rs", + "fastdivide", + "itertools 0.12.1", + "serde", + "tantivy-bitpacker", + "tantivy-common", + "tantivy-sstable", + "tantivy-stacker", +] + +[[package]] +name = "tantivy-common" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8019e3cabcfd20a1380b491e13ff42f57bb38bf97c3d5fa5c07e50816e0621f4" +dependencies = [ + "async-trait", + "byteorder", + "ownedbytes", + "serde", + "time", +] + +[[package]] +name = "tantivy-fst" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d60769b80ad7953d8a7b2c70cdfe722bbcdcac6bccc8ac934c40c034d866fc18" +dependencies = [ + "byteorder", + "regex-syntax 0.8.4", + "utf8-ranges", +] + +[[package]] +name = "tantivy-jieba" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f2fe65c125f0d76d06f0f2ce9fbb9287b53f0dafb51a6270d984a840e2f16c1" +dependencies = [ + "jieba-rs", + "lazy_static", + "tantivy-tokenizer-api", +] + +[[package]] +name = "tantivy-query-grammar" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "847434d4af57b32e309f4ab1b4f1707a6c566656264caa427ff4285c4d9d0b82" +dependencies = [ + "nom", +] + +[[package]] +name = "tantivy-sstable" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c69578242e8e9fc989119f522ba5b49a38ac20f576fc778035b96cc94f41f98e" +dependencies = [ + "tantivy-bitpacker", + "tantivy-common", + "tantivy-fst", + "zstd 0.13.1", +] + +[[package]] +name = "tantivy-stacker" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c56d6ff5591fc332739b3ce7035b57995a3ce29a93ffd6012660e0949c956ea8" +dependencies = [ + "murmurhash32", + "rand_distr", + "tantivy-common", +] + +[[package]] +name = "tantivy-tokenizer-api" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a0dcade25819a89cfe6f17d932c9cedff11989936bf6dd4f336d50392053b04" +dependencies = [ + "serde", +] + [[package]] name = "tap" version = "1.0.1" @@ -12810,6 +13130,12 @@ version = "1.0.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c8232dd3cdaed5356e0f716d285e4b40b932ac434100fe9b7e0e8e935b9e6246" +[[package]] +name = "utf8-ranges" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcfc827f90e53a02eaef5e535ee14266c1d569214c6aa70133a624d8a3164ba" + [[package]] name = "utf8-width" version = "0.1.7" diff --git a/src/index/Cargo.toml b/src/index/Cargo.toml index effa0a79e635..c1e76e4488a2 100644 --- a/src/index/Cargo.toml +++ b/src/index/Cargo.toml @@ -24,9 +24,14 @@ pin-project.workspace = true prost.workspace = true regex.workspace = true regex-automata.workspace = true +serde.workspace = true snafu.workspace = true +tantivy = { version = "0.22", features = ["zstd-compression"] } +tantivy-jieba = "0.11.0" +tokio.workspace = true [dev-dependencies] +common-test-util.workspace = true rand.workspace = true tempfile.workspace = true tokio.workspace = true diff --git a/src/index/src/fulltext_index.rs b/src/index/src/fulltext_index.rs new file mode 100644 index 000000000000..6fa8b97cf33f --- /dev/null +++ b/src/index/src/fulltext_index.rs @@ -0,0 +1,37 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use serde::{Deserialize, Serialize}; + +pub mod create; +pub mod error; + +/// Configuration for fulltext index. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub struct Config { + /// Analyzer to use for tokenization. + pub analyzer: Analyzer, + + /// Whether the index should be case sensitive. + pub case_sensitive: bool, +} + +/// Analyzer to use for tokenization. +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Default)] +pub enum Analyzer { + #[default] + English, + + Chinese, +} diff --git a/src/index/src/fulltext_index/create.rs b/src/index/src/fulltext_index/create.rs new file mode 100644 index 000000000000..2faf6e0b6a49 --- /dev/null +++ b/src/index/src/fulltext_index/create.rs @@ -0,0 +1,33 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod tantivy; + +use async_trait::async_trait; +pub use tantivy::TantivyFulltextIndexCreator; + +use crate::fulltext_index::error::Result; + +/// `FulltextIndexCreator` is for creating a fulltext index.` +#[async_trait] +pub trait FulltextIndexCreator: Send { + /// Pushes a text to the index. + async fn push_text(&mut self, text: &str) -> Result<()>; + + /// Finalizes the creation of the index. + async fn finish(&mut self) -> Result<()>; + + /// Returns the memory usage in bytes during the creation of the index. + fn memory_usage(&self) -> usize; +} diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs new file mode 100644 index 000000000000..42c98c5d1ead --- /dev/null +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -0,0 +1,316 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::path::Path; + +use async_trait::async_trait; +use snafu::ResultExt; +use tantivy::schema::{Schema, TEXT}; +use tantivy::store::{Compressor, ZstdCompressor}; +use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager}; +use tantivy::{doc, Index, SingleSegmentIndexWriter}; +use tantivy_jieba::JiebaTokenizer; + +use crate::fulltext_index::create::FulltextIndexCreator; +use crate::fulltext_index::error::{IOSnafu, Result, TantivySnafu}; +use crate::fulltext_index::{Analyzer, Config}; + +pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text"; + +/// `TantivyFulltextIndexCreator` is a fulltext index creator using tantivy. +/// +/// Here use a single segment to store the index so the maximun capacity for +/// the index is limited to 2<<31 rows (around 2 billion rows). +pub struct TantivyFulltextIndexCreator { + /// The tantivy index writer. + writer: Option, + + /// The field for the text. + text_field: tantivy::schema::Field, +} + +impl TantivyFulltextIndexCreator { + /// Creates a new `TantivyFulltextIndexCreator`. + /// + /// The `path` is the directory path in filesystem to store the index. + pub async fn new(path: impl AsRef, config: Config, memory_limit: usize) -> Result { + tokio::fs::create_dir_all(path.as_ref()) + .await + .context(IOSnafu)?; + + let mut schema_builder = Schema::builder(); + let text_field = schema_builder.add_text_field(TEXT_FIELD_NAME, TEXT); + let schema = schema_builder.build(); + + let mut index = Index::create_in_dir(path, schema).context(TantivySnafu)?; + index.settings_mut().docstore_compression = Compressor::Zstd(ZstdCompressor::default()); + index.set_tokenizers(Self::build_tokenizer(&config)); + + let memory_limit = Self::sanitize_memory_limit(memory_limit); + + let writer = SingleSegmentIndexWriter::new(index, memory_limit).context(TantivySnafu)?; + Ok(Self { + writer: Some(writer), + text_field, + }) + } + + fn build_tokenizer(config: &Config) -> TokenizerManager { + let mut builder = match config.analyzer { + Analyzer::English => TextAnalyzer::builder(SimpleTokenizer::default()).dynamic(), + Analyzer::Chinese => TextAnalyzer::builder(JiebaTokenizer {}).dynamic(), + }; + + if !config.case_sensitive { + builder = builder.filter_dynamic(LowerCaser); + } + + let tokenizer = builder.build(); + let tokenizer_manager = TokenizerManager::new(); + tokenizer_manager.register("default", tokenizer); + tokenizer_manager + } + + fn sanitize_memory_limit(memory_limit: usize) -> usize { + // Port from tantivy::indexer::index_writer::{MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX} + const MARGIN_IN_BYTES: usize = 1_000_000; + const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize; + const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES; + + memory_limit.clamp(MEMORY_BUDGET_NUM_BYTES_MIN, MEMORY_BUDGET_NUM_BYTES_MAX) + } +} + +#[async_trait] +impl FulltextIndexCreator for TantivyFulltextIndexCreator { + async fn push_text(&mut self, text: &str) -> Result<()> { + if let Some(writer) = &mut self.writer { + writer + .add_document(doc!(self.text_field => text)) + .context(TantivySnafu)?; + } + + Ok(()) + } + + async fn finish(&mut self) -> Result<()> { + if let Some(writer) = self.writer.take() { + writer.finalize().context(TantivySnafu)?; + } + Ok(()) + } + + fn memory_usage(&self) -> usize { + self.writer + .as_ref() + .map(|writer| writer.mem_usage()) + .unwrap_or(0) + } +} + +#[cfg(test)] +mod tests { + use std::collections::HashSet; + + use common_test_util::temp_dir::create_temp_dir; + use tantivy::collector::DocSetCollector; + use tantivy::query::QueryParser; + + use super::*; + + #[tokio::test] + async fn test_creator_basic() { + let memory_limits = [1, 64_000_000, usize::MAX]; + + for memory_limit in memory_limits { + let temp_dir = create_temp_dir("test_creator_basic_"); + + let mut creator = + TantivyFulltextIndexCreator::new(temp_dir.path(), Config::default(), memory_limit) + .await + .unwrap(); + + let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; + + for text in texts { + creator.push_text(text).await.unwrap(); + } + + creator.finish().await.unwrap(); + + let cases = [ + ("hello", HashSet::from_iter([0u32, 2].into_iter())), + ("world", HashSet::from_iter([1, 2].into_iter())), + ("foo", HashSet::from_iter([3].into_iter())), + ("bar", HashSet::from_iter([4].into_iter())), + ]; + + let index = Index::open_in_dir(temp_dir.path()).unwrap(); + let reader = index.reader().unwrap(); + + let searcher = reader.searcher(); + for (query, expected) in cases { + let query_parser = QueryParser::for_index( + &index, + vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], + ); + let query = query_parser.parse_query(query).unwrap(); + let docs = searcher.search(&query, &DocSetCollector).unwrap(); + let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + assert_eq!(res, expected); + } + } + } + + #[tokio::test] + async fn test_creator_case_sensitive() { + let memory_limits = [1, 64_000_000, usize::MAX]; + + for memory_limit in memory_limits { + let temp_dir = create_temp_dir("test_creator_case_sensitive_"); + + let config = Config { + case_sensitive: true, + ..Config::default() + }; + let mut creator = + TantivyFulltextIndexCreator::new(temp_dir.path(), config, memory_limit) + .await + .unwrap(); + + let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; + + for text in texts { + creator.push_text(text).await.unwrap(); + } + + creator.finish().await.unwrap(); + + let cases = [ + ("hello", HashSet::from_iter([0u32, 2].into_iter())), + ("world", HashSet::from_iter([1, 2].into_iter())), + ("foo", HashSet::from_iter([3].into_iter())), + ("bar", HashSet::from_iter([].into_iter())), + ]; + + let index = Index::open_in_dir(temp_dir.path()).unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + for (query, expected) in cases { + let query_parser = QueryParser::for_index( + &index, + vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], + ); + let query = query_parser.parse_query(query).unwrap(); + let docs = searcher.search(&query, &DocSetCollector).unwrap(); + let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + assert_eq!(res, expected); + } + } + } + + #[tokio::test] + async fn test_creator_chinese() { + let memory_limits = [1, 64_000_000, usize::MAX]; + + for memory_limit in memory_limits { + let temp_dir = create_temp_dir("test_creator_chinese_"); + + let config = Config { + analyzer: Analyzer::Chinese, + ..Config::default() + }; + let mut creator = + TantivyFulltextIndexCreator::new(temp_dir.path(), config, memory_limit) + .await + .unwrap(); + + let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; + + for text in texts { + creator.push_text(text).await.unwrap(); + } + + creator.finish().await.unwrap(); + + let cases = [ + ("你好", HashSet::from_iter([0u32, 2, 3].into_iter())), + ("世界", HashSet::from_iter([1, 2, 3].into_iter())), + ("foo", HashSet::from_iter([4].into_iter())), + ("bar", HashSet::from_iter([5].into_iter())), + ]; + + let index = Index::open_in_dir(temp_dir.path()).unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + for (query, expected) in cases { + let query_parser = QueryParser::for_index( + &index, + vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], + ); + let query = query_parser.parse_query(query).unwrap(); + let docs = searcher.search(&query, &DocSetCollector).unwrap(); + let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + assert_eq!(res, expected); + } + } + } + + #[tokio::test] + async fn test_creator_chinese_case_sensitive() { + let memory_limits = [1, 64_000_000, usize::MAX]; + + for memory_limit in memory_limits { + let temp_dir = create_temp_dir("test_creator_chinese_case_sensitive_"); + + let config = Config { + analyzer: Analyzer::Chinese, + case_sensitive: true, + }; + let mut creator = + TantivyFulltextIndexCreator::new(temp_dir.path(), config, memory_limit) + .await + .unwrap(); + + let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; + + for text in texts { + creator.push_text(text).await.unwrap(); + } + + creator.finish().await.unwrap(); + + let cases = [ + ("你好", HashSet::from_iter([0u32, 2, 3].into_iter())), + ("世界", HashSet::from_iter([1, 2, 3].into_iter())), + ("foo", HashSet::from_iter([4].into_iter())), + ("bar", HashSet::from_iter([].into_iter())), + ]; + + let index = Index::open_in_dir(temp_dir.path()).unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + for (query, expected) in cases { + let query_parser = QueryParser::for_index( + &index, + vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], + ); + let query = query_parser.parse_query(query).unwrap(); + let docs = searcher.search(&query, &DocSetCollector).unwrap(); + let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + assert_eq!(res, expected); + } + } + } +} diff --git a/src/index/src/fulltext_index/error.rs b/src/index/src/fulltext_index/error.rs new file mode 100644 index 000000000000..e2107dba0448 --- /dev/null +++ b/src/index/src/fulltext_index/error.rs @@ -0,0 +1,68 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::io::Error as IoError; + +use common_error::ext::{BoxedError, ErrorExt}; +use common_error::status_code::StatusCode; +use common_macro::stack_trace_debug; +use snafu::{Location, Snafu}; + +#[derive(Snafu)] +#[snafu(visibility(pub))] +#[stack_trace_debug] +pub enum Error { + #[snafu(display("IO error"))] + IO { + #[snafu(source)] + error: IoError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("Tantivy error"))] + Tantivy { + #[snafu(source)] + error: tantivy::TantivyError, + #[snafu(implicit)] + location: Location, + }, + + #[snafu(display("External error"))] + External { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, +} + +impl ErrorExt for Error { + fn status_code(&self) -> StatusCode { + use Error::*; + + match self { + Tantivy { .. } => StatusCode::Internal, + IO { .. } => StatusCode::Unexpected, + + External { source, .. } => source.status_code(), + } + } + + fn as_any(&self) -> &dyn Any { + self + } +} + +pub type Result = std::result::Result; diff --git a/src/index/src/lib.rs b/src/index/src/lib.rs index e7f448c398ef..197fc01818c0 100644 --- a/src/index/src/lib.rs +++ b/src/index/src/lib.rs @@ -14,4 +14,5 @@ #![feature(iter_partition_in_place)] +pub mod fulltext_index; pub mod inverted_index; diff --git a/src/puffin/src/puffin_manager/stager/bounded_stager.rs b/src/puffin/src/puffin_manager/stager/bounded_stager.rs index e0179a50c0f1..2b732450ca16 100644 --- a/src/puffin/src/puffin_manager/stager/bounded_stager.rs +++ b/src/puffin/src/puffin_manager/stager/bounded_stager.rs @@ -67,14 +67,14 @@ pub struct BoundedStager { } impl BoundedStager { - pub async fn new(base_dir: PathBuf, capicity: u64) -> Result { + pub async fn new(base_dir: PathBuf, capacity: u64) -> Result { let recycle_bin = Cache::builder() .time_to_live(Duration::from_secs(60)) .build(); let recycle_bin_cloned = recycle_bin.clone(); let cache = Cache::builder() - .max_capacity(capicity) + .max_capacity(capacity) .weigher(|_: &String, v: &CacheValue| v.weight()) .async_eviction_listener(move |k, v, _| { let recycle_bin = recycle_bin_cloned.clone(); diff --git a/src/puffin/src/puffin_manager/tests.rs b/src/puffin/src/puffin_manager/tests.rs index 0efa826b27a2..dc106d746182 100644 --- a/src/puffin/src/puffin_manager/tests.rs +++ b/src/puffin/src/puffin_manager/tests.rs @@ -31,12 +31,12 @@ use crate::puffin_manager::{ BlobGuard, DirGuard, PuffinManager, PuffinReader, PuffinWriter, PutOptions, }; -async fn new_bounded_stager(prefix: &str, capicity: u64) -> (TempDir, Arc) { +async fn new_bounded_stager(prefix: &str, capacity: u64) -> (TempDir, Arc) { let staging_dir = create_temp_dir(prefix); let path = staging_dir.path().to_path_buf(); ( staging_dir, - Arc::new(BoundedStager::new(path, capicity).await.unwrap()), + Arc::new(BoundedStager::new(path, capacity).await.unwrap()), ) } @@ -45,9 +45,9 @@ async fn test_put_get_file() { let capicities = [1, 16, u64::MAX]; let compression_codecs = [None, Some(CompressionCodec::Zstd)]; - for capicity in capicities { + for capacity in capicities { for compression_codec in compression_codecs { - let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capicity).await; + let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await; let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_file_")); let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); @@ -65,7 +65,7 @@ async fn test_put_get_file() { check_blob(puffin_file_name, key, raw_data, &stager, &reader).await; // renew cache manager - let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capicity).await; + let (_staging_dir, stager) = new_bounded_stager("test_put_get_file_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); @@ -78,11 +78,11 @@ async fn test_put_get_file() { async fn test_put_get_files() { let capicities = [1, 16, u64::MAX]; - for capicity in capicities { + for capacity in capicities { let compression_codecs = [None, Some(CompressionCodec::Zstd)]; for compression_codec in compression_codecs { - let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capicity).await; + let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capacity).await; let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_files_")); let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); @@ -110,7 +110,7 @@ async fn test_put_get_files() { } // renew cache manager - let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capicity).await; + let (_staging_dir, stager) = new_bounded_stager("test_put_get_files_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); for (key, raw_data) in &blobs { @@ -126,9 +126,9 @@ async fn test_put_get_dir() { let compression_codecs = [None, Some(CompressionCodec::Zstd)]; - for capicity in capicities { + for capacity in capicities { for compression_codec in compression_codecs { - let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capicity).await; + let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capacity).await; let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_dir_")); let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); @@ -154,7 +154,7 @@ async fn test_put_get_dir() { check_dir(puffin_file_name, key, &files_in_dir, &stager, &reader).await; // renew cache manager - let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capicity).await; + let (_staging_dir, stager) = new_bounded_stager("test_put_get_dir_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); @@ -168,10 +168,10 @@ async fn test_put_get_mix_file_dir() { let capicities = [1, 64, u64::MAX]; let compression_codecs = [None, Some(CompressionCodec::Zstd)]; - for capicity in capicities { + for capacity in capicities { for compression_codec in compression_codecs { let (_staging_dir, stager) = - new_bounded_stager("test_put_get_mix_file_dir_", capicity).await; + new_bounded_stager("test_put_get_mix_file_dir_", capacity).await; let file_accessor = Arc::new(MockFileAccessor::new("test_put_get_mix_file_dir_")); let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor.clone()); @@ -211,7 +211,7 @@ async fn test_put_get_mix_file_dir() { // renew cache manager let (_staging_dir, stager) = - new_bounded_stager("test_put_get_mix_file_dir_", capicity).await; + new_bounded_stager("test_put_get_mix_file_dir_", capacity).await; let puffin_manager = FsPuffinManager::new(stager.clone(), file_accessor); let reader = puffin_manager.reader(puffin_file_name).await.unwrap(); From d72337d343891c12f66aa6213e1f8365a9d309e3 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 2 Jul 2024 08:05:00 +0000 Subject: [PATCH 2/6] fix: typo Signed-off-by: Zhenchi --- src/index/src/fulltext_index/create/tantivy.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index 42c98c5d1ead..2d61b3e61474 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -30,7 +30,7 @@ pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text"; /// `TantivyFulltextIndexCreator` is a fulltext index creator using tantivy. /// -/// Here use a single segment to store the index so the maximun capacity for +/// Here use a single segment to store the index so the maximum capacity for /// the index is limited to 2<<31 rows (around 2 billion rows). pub struct TantivyFulltextIndexCreator { /// The tantivy index writer. From 5ed768e949678f37762a6df00cde41cc66cce26e Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 2 Jul 2024 08:06:28 +0000 Subject: [PATCH 3/6] fix: typo Signed-off-by: Zhenchi --- src/index/src/fulltext_index/create.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/index/src/fulltext_index/create.rs b/src/index/src/fulltext_index/create.rs index 2faf6e0b6a49..6b85d6b1630f 100644 --- a/src/index/src/fulltext_index/create.rs +++ b/src/index/src/fulltext_index/create.rs @@ -19,7 +19,7 @@ pub use tantivy::TantivyFulltextIndexCreator; use crate::fulltext_index::error::Result; -/// `FulltextIndexCreator` is for creating a fulltext index.` +/// `FulltextIndexCreator` is for creating a fulltext index. #[async_trait] pub trait FulltextIndexCreator: Send { /// Pushes a text to the index. From 3c036b0183461a4757592e0cd5e430c3b7219170 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 2 Jul 2024 08:22:23 +0000 Subject: [PATCH 4/6] chore: polish Signed-off-by: Zhenchi --- .../src/fulltext_index/create/tantivy.rs | 54 +++++++++---------- 1 file changed, 24 insertions(+), 30 deletions(-) diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index 2d61b3e61474..f07cc2fe04d9 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -121,8 +121,6 @@ impl FulltextIndexCreator for TantivyFulltextIndexCreator { #[cfg(test)] mod tests { - use std::collections::HashSet; - use common_test_util::temp_dir::create_temp_dir; use tantivy::collector::DocSetCollector; use tantivy::query::QueryParser; @@ -142,18 +140,16 @@ mod tests { .unwrap(); let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; - for text in texts { creator.push_text(text).await.unwrap(); } - creator.finish().await.unwrap(); let cases = [ - ("hello", HashSet::from_iter([0u32, 2].into_iter())), - ("world", HashSet::from_iter([1, 2].into_iter())), - ("foo", HashSet::from_iter([3].into_iter())), - ("bar", HashSet::from_iter([4].into_iter())), + ("hello", vec![0u32, 2]), + ("world", vec![1, 2]), + ("foo", vec![3]), + ("bar", vec![4]), ]; let index = Index::open_in_dir(temp_dir.path()).unwrap(); @@ -167,7 +163,8 @@ mod tests { ); let query = query_parser.parse_query(query).unwrap(); let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); + res.sort(); assert_eq!(res, expected); } } @@ -190,18 +187,16 @@ mod tests { .unwrap(); let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; - for text in texts { creator.push_text(text).await.unwrap(); } - creator.finish().await.unwrap(); let cases = [ - ("hello", HashSet::from_iter([0u32, 2].into_iter())), - ("world", HashSet::from_iter([1, 2].into_iter())), - ("foo", HashSet::from_iter([3].into_iter())), - ("bar", HashSet::from_iter([].into_iter())), + ("hello", vec![0u32, 2]), + ("world", vec![1, 2]), + ("foo", vec![3]), + ("bar", vec![]), ]; let index = Index::open_in_dir(temp_dir.path()).unwrap(); @@ -214,7 +209,8 @@ mod tests { ); let query = query_parser.parse_query(query).unwrap(); let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); + res.sort(); assert_eq!(res, expected); } } @@ -237,18 +233,16 @@ mod tests { .unwrap(); let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; - for text in texts { creator.push_text(text).await.unwrap(); } - creator.finish().await.unwrap(); let cases = [ - ("你好", HashSet::from_iter([0u32, 2, 3].into_iter())), - ("世界", HashSet::from_iter([1, 2, 3].into_iter())), - ("foo", HashSet::from_iter([4].into_iter())), - ("bar", HashSet::from_iter([5].into_iter())), + ("你好", vec![0u32, 2, 3]), + ("世界", vec![1, 2, 3]), + ("foo", vec![4]), + ("bar", vec![5]), ]; let index = Index::open_in_dir(temp_dir.path()).unwrap(); @@ -261,7 +255,8 @@ mod tests { ); let query = query_parser.parse_query(query).unwrap(); let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); + res.sort(); assert_eq!(res, expected); } } @@ -284,18 +279,16 @@ mod tests { .unwrap(); let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; - for text in texts { creator.push_text(text).await.unwrap(); } - creator.finish().await.unwrap(); let cases = [ - ("你好", HashSet::from_iter([0u32, 2, 3].into_iter())), - ("世界", HashSet::from_iter([1, 2, 3].into_iter())), - ("foo", HashSet::from_iter([4].into_iter())), - ("bar", HashSet::from_iter([].into_iter())), + ("你好", vec![0u32, 2, 3]), + ("世界", vec![1, 2, 3]), + ("foo", vec![4]), + ("bar", vec![]), ]; let index = Index::open_in_dir(temp_dir.path()).unwrap(); @@ -308,7 +301,8 @@ mod tests { ); let query = query_parser.parse_query(query).unwrap(); let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let res = docs.into_iter().map(|d| d.doc_id).collect::>(); + let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); + res.sort(); assert_eq!(res, expected); } } From 187346de2101cda0b39e73d6caea365167e1781d Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 2 Jul 2024 08:37:23 +0000 Subject: [PATCH 5/6] fix: return error if writer not found Signed-off-by: Zhenchi --- .../src/fulltext_index/create/tantivy.rs | 22 +++++++------------ src/index/src/fulltext_index/error.rs | 10 +++++++-- 2 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index f07cc2fe04d9..074d15fd04a3 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -15,7 +15,7 @@ use std::path::Path; use async_trait::async_trait; -use snafu::ResultExt; +use snafu::{OptionExt, ResultExt}; use tantivy::schema::{Schema, TEXT}; use tantivy::store::{Compressor, ZstdCompressor}; use tantivy::tokenizer::{LowerCaser, SimpleTokenizer, TextAnalyzer, TokenizerManager}; @@ -23,7 +23,7 @@ use tantivy::{doc, Index, SingleSegmentIndexWriter}; use tantivy_jieba::JiebaTokenizer; use crate::fulltext_index::create::FulltextIndexCreator; -use crate::fulltext_index::error::{IOSnafu, Result, TantivySnafu}; +use crate::fulltext_index::error::{FinishedSnafu, IoSnafu, Result, TantivySnafu}; use crate::fulltext_index::{Analyzer, Config}; pub const TEXT_FIELD_NAME: &str = "greptime_fulltext_text"; @@ -47,7 +47,7 @@ impl TantivyFulltextIndexCreator { pub async fn new(path: impl AsRef, config: Config, memory_limit: usize) -> Result { tokio::fs::create_dir_all(path.as_ref()) .await - .context(IOSnafu)?; + .context(IoSnafu)?; let mut schema_builder = Schema::builder(); let text_field = schema_builder.add_text_field(TEXT_FIELD_NAME, TEXT); @@ -95,20 +95,14 @@ impl TantivyFulltextIndexCreator { #[async_trait] impl FulltextIndexCreator for TantivyFulltextIndexCreator { async fn push_text(&mut self, text: &str) -> Result<()> { - if let Some(writer) = &mut self.writer { - writer - .add_document(doc!(self.text_field => text)) - .context(TantivySnafu)?; - } - - Ok(()) + let writer = self.writer.as_mut().context(FinishedSnafu)?; + let doc = doc!(self.text_field => text); + writer.add_document(doc).context(TantivySnafu) } async fn finish(&mut self) -> Result<()> { - if let Some(writer) = self.writer.take() { - writer.finalize().context(TantivySnafu)?; - } - Ok(()) + let writer = self.writer.take().context(FinishedSnafu)?; + writer.finalize().map(|_| ()).context(TantivySnafu) } fn memory_usage(&self) -> usize { diff --git a/src/index/src/fulltext_index/error.rs b/src/index/src/fulltext_index/error.rs index e2107dba0448..ddc54d0dae2c 100644 --- a/src/index/src/fulltext_index/error.rs +++ b/src/index/src/fulltext_index/error.rs @@ -25,7 +25,7 @@ use snafu::{Location, Snafu}; #[stack_trace_debug] pub enum Error { #[snafu(display("IO error"))] - IO { + Io { #[snafu(source)] error: IoError, #[snafu(implicit)] @@ -40,6 +40,12 @@ pub enum Error { location: Location, }, + #[snafu(display("Operate on a finished creator"))] + Finished { + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("External error"))] External { source: BoxedError, @@ -54,7 +60,7 @@ impl ErrorExt for Error { match self { Tantivy { .. } => StatusCode::Internal, - IO { .. } => StatusCode::Unexpected, + Io { .. } | Finished { .. } => StatusCode::Unexpected, External { source, .. } => source.status_code(), } From 8d965d594c86a5a8a3827d98baa55947bcde38b3 Mon Sep 17 00:00:00 2001 From: Zhenchi Date: Tue, 2 Jul 2024 08:45:55 +0000 Subject: [PATCH 6/6] refactor: helper function for tests Signed-off-by: Zhenchi --- src/index/src/fulltext_index.rs | 2 +- .../src/fulltext_index/create/tantivy.rs | 139 +++++------------- 2 files changed, 40 insertions(+), 101 deletions(-) diff --git a/src/index/src/fulltext_index.rs b/src/index/src/fulltext_index.rs index 6fa8b97cf33f..f6dfb4ae83fd 100644 --- a/src/index/src/fulltext_index.rs +++ b/src/index/src/fulltext_index.rs @@ -23,7 +23,7 @@ pub struct Config { /// Analyzer to use for tokenization. pub analyzer: Analyzer, - /// Whether the index should be case sensitive. + /// Whether the index should be case-sensitive. pub case_sensitive: bool, } diff --git a/src/index/src/fulltext_index/create/tantivy.rs b/src/index/src/fulltext_index/create/tantivy.rs index 074d15fd04a3..16064e5aeeac 100644 --- a/src/index/src/fulltext_index/create/tantivy.rs +++ b/src/index/src/fulltext_index/create/tantivy.rs @@ -128,16 +128,9 @@ mod tests { for memory_limit in memory_limits { let temp_dir = create_temp_dir("test_creator_basic_"); - let mut creator = - TantivyFulltextIndexCreator::new(temp_dir.path(), Config::default(), memory_limit) - .await - .unwrap(); - let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; - for text in texts { - creator.push_text(text).await.unwrap(); - } - creator.finish().await.unwrap(); + let config = Config::default(); + build_index(&texts, temp_dir.path(), config, memory_limit).await; let cases = [ ("hello", vec![0u32, 2]), @@ -145,22 +138,7 @@ mod tests { ("foo", vec![3]), ("bar", vec![4]), ]; - - let index = Index::open_in_dir(temp_dir.path()).unwrap(); - let reader = index.reader().unwrap(); - - let searcher = reader.searcher(); - for (query, expected) in cases { - let query_parser = QueryParser::for_index( - &index, - vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], - ); - let query = query_parser.parse_query(query).unwrap(); - let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); - res.sort(); - assert_eq!(res, expected); - } + query_and_check(temp_dir.path(), &cases).await; } } @@ -171,20 +149,12 @@ mod tests { for memory_limit in memory_limits { let temp_dir = create_temp_dir("test_creator_case_sensitive_"); + let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; let config = Config { case_sensitive: true, ..Config::default() }; - let mut creator = - TantivyFulltextIndexCreator::new(temp_dir.path(), config, memory_limit) - .await - .unwrap(); - - let texts = vec!["hello", "world", "hello, world", "foo!", "Bar"]; - for text in texts { - creator.push_text(text).await.unwrap(); - } - creator.finish().await.unwrap(); + build_index(&texts, temp_dir.path(), config, memory_limit).await; let cases = [ ("hello", vec![0u32, 2]), @@ -192,21 +162,7 @@ mod tests { ("foo", vec![3]), ("bar", vec![]), ]; - - let index = Index::open_in_dir(temp_dir.path()).unwrap(); - let reader = index.reader().unwrap(); - let searcher = reader.searcher(); - for (query, expected) in cases { - let query_parser = QueryParser::for_index( - &index, - vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], - ); - let query = query_parser.parse_query(query).unwrap(); - let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); - res.sort(); - assert_eq!(res, expected); - } + query_and_check(temp_dir.path(), &cases).await; } } @@ -217,20 +173,12 @@ mod tests { for memory_limit in memory_limits { let temp_dir = create_temp_dir("test_creator_chinese_"); + let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; let config = Config { analyzer: Analyzer::Chinese, ..Config::default() }; - let mut creator = - TantivyFulltextIndexCreator::new(temp_dir.path(), config, memory_limit) - .await - .unwrap(); - - let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; - for text in texts { - creator.push_text(text).await.unwrap(); - } - creator.finish().await.unwrap(); + build_index(&texts, temp_dir.path(), config, memory_limit).await; let cases = [ ("你好", vec![0u32, 2, 3]), @@ -238,21 +186,7 @@ mod tests { ("foo", vec![4]), ("bar", vec![5]), ]; - - let index = Index::open_in_dir(temp_dir.path()).unwrap(); - let reader = index.reader().unwrap(); - let searcher = reader.searcher(); - for (query, expected) in cases { - let query_parser = QueryParser::for_index( - &index, - vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], - ); - let query = query_parser.parse_query(query).unwrap(); - let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); - res.sort(); - assert_eq!(res, expected); - } + query_and_check(temp_dir.path(), &cases).await; } } @@ -263,20 +197,12 @@ mod tests { for memory_limit in memory_limits { let temp_dir = create_temp_dir("test_creator_chinese_case_sensitive_"); + let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; let config = Config { - analyzer: Analyzer::Chinese, case_sensitive: true, + analyzer: Analyzer::Chinese, }; - let mut creator = - TantivyFulltextIndexCreator::new(temp_dir.path(), config, memory_limit) - .await - .unwrap(); - - let texts = vec!["你好", "世界", "你好,世界", "你好世界", "foo!", "Bar"]; - for text in texts { - creator.push_text(text).await.unwrap(); - } - creator.finish().await.unwrap(); + build_index(&texts, temp_dir.path(), config, memory_limit).await; let cases = [ ("你好", vec![0u32, 2, 3]), @@ -284,21 +210,34 @@ mod tests { ("foo", vec![4]), ("bar", vec![]), ]; + query_and_check(temp_dir.path(), &cases).await; + } + } + + async fn build_index(texts: &[&str], path: &Path, config: Config, memory_limit: usize) { + let mut creator = TantivyFulltextIndexCreator::new(path, config, memory_limit) + .await + .unwrap(); + for text in texts { + creator.push_text(text).await.unwrap(); + } + creator.finish().await.unwrap(); + } - let index = Index::open_in_dir(temp_dir.path()).unwrap(); - let reader = index.reader().unwrap(); - let searcher = reader.searcher(); - for (query, expected) in cases { - let query_parser = QueryParser::for_index( - &index, - vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], - ); - let query = query_parser.parse_query(query).unwrap(); - let docs = searcher.search(&query, &DocSetCollector).unwrap(); - let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); - res.sort(); - assert_eq!(res, expected); - } + async fn query_and_check(path: &Path, cases: &[(&str, Vec)]) { + let index = Index::open_in_dir(path).unwrap(); + let reader = index.reader().unwrap(); + let searcher = reader.searcher(); + for (query, expected) in cases { + let query_parser = QueryParser::for_index( + &index, + vec![index.schema().get_field(TEXT_FIELD_NAME).unwrap()], + ); + let query = query_parser.parse_query(query).unwrap(); + let docs = searcher.search(&query, &DocSetCollector).unwrap(); + let mut res = docs.into_iter().map(|d| d.doc_id).collect::>(); + res.sort(); + assert_eq!(res, *expected); } } }