Skip to content

Commit

Permalink
add RegexPhraseQuery (#2516)
Browse files Browse the repository at this point in the history
* add RegexPhraseQuery

RegexPhraseQuery supports phrase queries with regex. It supports regex
and wildcards. E.g. a query with wildcards:
"b* b* wolf" matches "big bad wolf"
Slop is supported as well:
"b* wolf"~2 matches "big bad wolf"

Regex queries may match a lot of terms where we still need to
keep track which term hit to load the positions.
The phrase query algorithm groups terms by their frequency
together in the union to prefilter groups early.

This PR comes with some new datastructures:

SimpleUnion - A union docset for a list of docsets. It doesn't do any
caching and is therefore well suited for datasets with lots of skipping.
(phrase search, but intersections in general)

LoadedPostings - Like SegmentPostings, but all docs and positions are loaded in
memory. SegmentPostings uses 1840 bytes per instance with its caches,
which is equivalent to 460 docids.
LoadedPostings is used for terms which have less than 100 docs.
LoadedPostings is only used to reduce memory consumption.

BitSetPostingUnion - Creates a `Posting` that uses the bitset for docid
hits and the docsets for positions. The BitSet is the precalculated
union of the docsets
In the RegexPhraseQuery there is a size limit of 512 docsets per PreAggregatedUnion,
before creating a new one.

Renamed Union to BufferedUnionScorer
Added proptests to test different union types.

* cleanup

* use Box instead of Vec

* use RefCell instead of term_freq(&mut)

* remove wildcard mode

* move RefCell to outer

* clippy
  • Loading branch information
PSeitz authored Oct 21, 2024
1 parent e7e3e3f commit aebae99
Show file tree
Hide file tree
Showing 16 changed files with 1,380 additions and 249 deletions.
155 changes: 155 additions & 0 deletions src/postings/loaded_postings.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
use crate::docset::{DocSet, TERMINATED};
use crate::postings::{Postings, SegmentPostings};
use crate::DocId;

/// `LoadedPostings` is a `DocSet` and `Postings` implementation.
/// It is used to represent the postings of a term in memory.
/// It is suitable if there are few documents for a term.
///
/// It exists mainly to reduce memory usage.
/// `SegmentPostings` uses 1840 bytes per instance due to its caches.
/// If you need to keep many terms around with few docs, it's cheaper to load all the
/// postings in memory.
///
/// This is relevant for `RegexPhraseQuery`, which may have a lot of
/// terms.
/// E.g. 100_000 terms would need 184MB due to SegmentPostings.
pub struct LoadedPostings {
doc_ids: Box<[DocId]>,
position_offsets: Box<[u32]>,
positions: Box<[u32]>,
cursor: usize,
}

impl LoadedPostings {
/// Creates a new `LoadedPostings` from a `SegmentPostings`.
///
/// It will also preload positions, if positions are available in the SegmentPostings.
pub fn load(segment_postings: &mut SegmentPostings) -> LoadedPostings {
let num_docs = segment_postings.doc_freq() as usize;
let mut doc_ids = Vec::with_capacity(num_docs);
let mut positions = Vec::with_capacity(num_docs);
let mut position_offsets = Vec::with_capacity(num_docs);
while segment_postings.doc() != TERMINATED {
position_offsets.push(positions.len() as u32);
doc_ids.push(segment_postings.doc());
segment_postings.append_positions_with_offset(0, &mut positions);
segment_postings.advance();
}
position_offsets.push(positions.len() as u32);
LoadedPostings {
doc_ids: doc_ids.into_boxed_slice(),
positions: positions.into_boxed_slice(),
position_offsets: position_offsets.into_boxed_slice(),
cursor: 0,
}
}
}

#[cfg(test)]
impl From<(Vec<DocId>, Vec<Vec<u32>>)> for LoadedPostings {
fn from(doc_ids_and_positions: (Vec<DocId>, Vec<Vec<u32>>)) -> LoadedPostings {
let mut position_offsets = Vec::new();
let mut all_positions = Vec::new();
let (doc_ids, docid_positions) = doc_ids_and_positions;
for positions in docid_positions {
position_offsets.push(all_positions.len() as u32);
all_positions.extend_from_slice(&positions);
}
position_offsets.push(all_positions.len() as u32);
LoadedPostings {
doc_ids: doc_ids.into_boxed_slice(),
positions: all_positions.into_boxed_slice(),
position_offsets: position_offsets.into_boxed_slice(),
cursor: 0,
}
}
}

impl DocSet for LoadedPostings {
fn advance(&mut self) -> DocId {
self.cursor += 1;
if self.cursor >= self.doc_ids.len() {
self.cursor = self.doc_ids.len();
return TERMINATED;
}
self.doc()
}

fn doc(&self) -> DocId {
if self.cursor >= self.doc_ids.len() {
return TERMINATED;
}
self.doc_ids[self.cursor]
}

fn size_hint(&self) -> u32 {
self.doc_ids.len() as u32
}
}
impl Postings for LoadedPostings {
fn term_freq(&self) -> u32 {
let start = self.position_offsets[self.cursor] as usize;
let end = self.position_offsets[self.cursor + 1] as usize;
(end - start) as u32
}

fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let start = self.position_offsets[self.cursor] as usize;
let end = self.position_offsets[self.cursor + 1] as usize;
for pos in &self.positions[start..end] {
output.push(*pos + offset);
}
}
}

#[cfg(test)]
pub mod tests {

use super::*;

#[test]
pub fn test_vec_postings() {
let doc_ids: Vec<DocId> = (0u32..1024u32).map(|e| e * 3).collect();
let mut postings = LoadedPostings::from((doc_ids, vec![]));
assert_eq!(postings.doc(), 0u32);
assert_eq!(postings.advance(), 3u32);
assert_eq!(postings.doc(), 3u32);
assert_eq!(postings.seek(14u32), 15u32);
assert_eq!(postings.doc(), 15u32);
assert_eq!(postings.seek(300u32), 300u32);
assert_eq!(postings.doc(), 300u32);
assert_eq!(postings.seek(6000u32), TERMINATED);
}

#[test]
pub fn test_vec_postings2() {
let doc_ids: Vec<DocId> = (0u32..1024u32).map(|e| e * 3).collect();
let mut positions = Vec::new();
positions.resize(1024, Vec::new());
positions[0] = vec![1u32, 2u32, 3u32];
positions[1] = vec![30u32];
positions[2] = vec![10u32];
positions[4] = vec![50u32];
let mut postings = LoadedPostings::from((doc_ids, positions));

let load = |postings: &mut LoadedPostings| {
let mut loaded_positions = Vec::new();
postings.positions(loaded_positions.as_mut());
loaded_positions
};
assert_eq!(postings.doc(), 0u32);
assert_eq!(load(&mut postings), vec![1u32, 2u32, 3u32]);

assert_eq!(postings.advance(), 3u32);
assert_eq!(postings.doc(), 3u32);

assert_eq!(load(&mut postings), vec![30u32]);

assert_eq!(postings.seek(14u32), 15u32);
assert_eq!(postings.doc(), 15u32);
assert_eq!(postings.seek(300u32), 300u32);
assert_eq!(postings.doc(), 300u32);
assert_eq!(postings.seek(6000u32), TERMINATED);
}
}
2 changes: 2 additions & 0 deletions src/postings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ mod block_segment_postings;
pub(crate) mod compression;
mod indexing_context;
mod json_postings_writer;
mod loaded_postings;
mod per_field_postings_writer;
mod postings;
mod postings_writer;
Expand All @@ -17,6 +18,7 @@ mod serializer;
mod skip;
mod term_info;

pub(crate) use loaded_postings::LoadedPostings;
pub(crate) use stacker::compute_table_memory_size;

pub use self::block_segment_postings::BlockSegmentPostings;
Expand Down
19 changes: 18 additions & 1 deletion src/postings/postings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,28 @@ pub trait Postings: DocSet + 'static {
/// Returns the positions offsetted with a given value.
/// It is not necessary to clear the `output` before calling this method.
/// The output vector will be resized to the `term_freq`.
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>);
fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
output.clear();
self.append_positions_with_offset(offset, output);
}

/// Returns the positions offsetted with a given value.
/// Data will be appended to the output.
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>);

/// Returns the positions of the term in the given document.
/// The output vector will be resized to the `term_freq`.
fn positions(&mut self, output: &mut Vec<u32>) {
self.positions_with_offset(0u32, output);
}
}

impl Postings for Box<dyn Postings> {
fn term_freq(&self) -> u32 {
(**self).term_freq()
}

fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
(**self).append_positions_with_offset(offset, output);
}
}
12 changes: 6 additions & 6 deletions src/postings/segment_postings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,9 @@ impl Postings for SegmentPostings {
self.block_cursor.freq(self.cur)
}

fn positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
fn append_positions_with_offset(&mut self, offset: u32, output: &mut Vec<u32>) {
let term_freq = self.term_freq();
let prev_len = output.len();
if let Some(position_reader) = self.position_reader.as_mut() {
debug_assert!(
!self.block_cursor.freqs().is_empty(),
Expand All @@ -249,15 +250,14 @@ impl Postings for SegmentPostings {
.iter()
.cloned()
.sum::<u32>() as u64);
output.resize(term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[..]);
// TODO: instead of zeroing the output, we could use MaybeUninit or similar.
output.resize(prev_len + term_freq as usize, 0u32);
position_reader.read(read_offset, &mut output[prev_len..]);
let mut cum = offset;
for output_mut in output.iter_mut() {
for output_mut in output[prev_len..].iter_mut() {
cum += *output_mut;
*output_mut = cum;
}
} else {
output.clear();
}
}
}
Expand Down
13 changes: 13 additions & 0 deletions src/query/automaton_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use tantivy_fst::Automaton;

use super::phrase_prefix_query::prefix_end;
use crate::index::SegmentReader;
use crate::postings::TermInfo;
use crate::query::{BitSetDocSet, ConstScorer, Explanation, Scorer, Weight};
use crate::schema::{Field, IndexRecordOption};
use crate::termdict::{TermDictionary, TermStreamer};
Expand Down Expand Up @@ -64,6 +65,18 @@ where

term_stream_builder.into_stream()
}

/// Returns the term infos that match the automaton
pub fn get_match_term_infos(&self, reader: &SegmentReader) -> crate::Result<Vec<TermInfo>> {
let inverted_index = reader.inverted_index(self.field)?;
let term_dict = inverted_index.terms();
let mut term_stream = self.automaton_stream(term_dict)?;
let mut term_infos = Vec::new();
while term_stream.advance() {
term_infos.push(term_stream.value().clone());
}
Ok(term_infos)
}
}

impl<A> Weight for AutomatonWeight<A>
Expand Down
4 changes: 2 additions & 2 deletions src/query/boolean_query/block_wand.rs
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ mod tests {

use crate::query::score_combiner::SumCombiner;
use crate::query::term_query::TermScorer;
use crate::query::{Bm25Weight, Scorer, Union};
use crate::query::{Bm25Weight, BufferedUnionScorer, Scorer};
use crate::{DocId, DocSet, Score, TERMINATED};

struct Float(Score);
Expand Down Expand Up @@ -371,7 +371,7 @@ mod tests {
fn compute_checkpoints_manual(term_scorers: Vec<TermScorer>, n: usize) -> Vec<(DocId, Score)> {
let mut heap: BinaryHeap<Float> = BinaryHeap::with_capacity(n);
let mut checkpoints: Vec<(DocId, Score)> = Vec::new();
let mut scorer = Union::build(term_scorers, SumCombiner::default);
let mut scorer = BufferedUnionScorer::build(term_scorers, SumCombiner::default);

let mut limit = Score::MIN;
loop {
Expand Down
19 changes: 12 additions & 7 deletions src/query/boolean_query/boolean_weight.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::query::score_combiner::{DoNothingCombiner, ScoreCombiner};
use crate::query::term_query::TermScorer;
use crate::query::weight::{for_each_docset_buffered, for_each_pruning_scorer, for_each_scorer};
use crate::query::{
intersect_scorers, EmptyScorer, Exclude, Explanation, Occur, RequiredOptionalScorer, Scorer,
Union, Weight,
intersect_scorers, BufferedUnionScorer, EmptyScorer, Exclude, Explanation, Occur,
RequiredOptionalScorer, Scorer, Weight,
};
use crate::{DocId, Score};

Expand Down Expand Up @@ -65,14 +65,17 @@ where
// Block wand is only available if we read frequencies.
return SpecializedScorer::TermUnion(scorers);
} else {
return SpecializedScorer::Other(Box::new(Union::build(
return SpecializedScorer::Other(Box::new(BufferedUnionScorer::build(
scorers,
score_combiner_fn,
)));
}
}
}
SpecializedScorer::Other(Box::new(Union::build(scorers, score_combiner_fn)))
SpecializedScorer::Other(Box::new(BufferedUnionScorer::build(
scorers,
score_combiner_fn,
)))
}

fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
Expand All @@ -81,7 +84,7 @@ fn into_box_scorer<TScoreCombiner: ScoreCombiner>(
) -> Box<dyn Scorer> {
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let union_scorer = Union::build(term_scorers, score_combiner_fn);
let union_scorer = BufferedUnionScorer::build(term_scorers, score_combiner_fn);
Box::new(union_scorer)
}
SpecializedScorer::Other(scorer) => scorer,
Expand Down Expand Up @@ -296,7 +299,8 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin
let scorer = self.complex_scorer(reader, 1.0, &self.score_combiner_fn)?;
match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer = Union::build(term_scorers, &self.score_combiner_fn);
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn);
for_each_scorer(&mut union_scorer, callback);
}
SpecializedScorer::Other(mut scorer) => {
Expand All @@ -316,7 +320,8 @@ impl<TScoreCombiner: ScoreCombiner + Sync> Weight for BooleanWeight<TScoreCombin

match scorer {
SpecializedScorer::TermUnion(term_scorers) => {
let mut union_scorer = Union::build(term_scorers, &self.score_combiner_fn);
let mut union_scorer =
BufferedUnionScorer::build(term_scorers, &self.score_combiner_fn);
for_each_docset_buffered(&mut union_scorer, &mut buffer, callback);
}
SpecializedScorer::Other(mut scorer) => {
Expand Down
3 changes: 2 additions & 1 deletion src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ pub use self::fuzzy_query::FuzzyTermQuery;
pub use self::intersection::{intersect_scorers, Intersection};
pub use self::more_like_this::{MoreLikeThisQuery, MoreLikeThisQueryBuilder};
pub use self::phrase_prefix_query::PhrasePrefixQuery;
pub use self::phrase_query::regex_phrase_query::{wildcard_query_to_regex_str, RegexPhraseQuery};
pub use self::phrase_query::PhraseQuery;
pub use self::query::{EnableScoring, Query, QueryClone};
pub use self::query_parser::{QueryParser, QueryParserError};
Expand All @@ -61,7 +62,7 @@ pub use self::score_combiner::{DisjunctionMaxCombiner, ScoreCombiner, SumCombine
pub use self::scorer::Scorer;
pub use self::set_query::TermSetQuery;
pub use self::term_query::TermQuery;
pub use self::union::Union;
pub use self::union::BufferedUnionScorer;
#[cfg(test)]
pub use self::vec_docset::VecDocSet;
pub use self::weight::Weight;
Expand Down
8 changes: 5 additions & 3 deletions src/query/phrase_query/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
mod phrase_query;
mod phrase_scorer;
mod phrase_weight;
pub mod regex_phrase_query;
mod regex_phrase_weight;

pub use self::phrase_query::PhraseQuery;
pub(crate) use self::phrase_scorer::intersection_count;
Expand All @@ -19,15 +21,15 @@ pub mod tests {
use crate::schema::{Schema, Term, TEXT};
use crate::{assert_nearly_equals, DocAddress, DocId, IndexWriter, TERMINATED};

pub fn create_index(texts: &[&'static str]) -> crate::Result<Index> {
pub fn create_index<S: AsRef<str>>(texts: &[S]) -> crate::Result<Index> {
let mut schema_builder = Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
{
let mut index_writer: IndexWriter = index.writer_for_tests()?;
for &text in texts {
let doc = doc!(text_field=>text);
for text in texts {
let doc = doc!(text_field=>text.as_ref());
index_writer.add_document(doc)?;
}
index_writer.commit()?;
Expand Down
Loading

0 comments on commit aebae99

Please sign in to comment.