Skip to content
This repository has been archived by the owner on Apr 4, 2023. It is now read-only.

Commit

Permalink
Merge #308
Browse files Browse the repository at this point in the history
308: Implement a better parallel indexer r=Kerollmops a=ManyTheFish

Rewrite the indexer:
- enhance memory consumption control
- optimize parallelism using rayon and crossbeam channel
- factorize the different parts and make new DB implementation easier
- optimize and fix prefix databases


Co-authored-by: many <[email protected]>
  • Loading branch information
bors[bot] and ManyTheFish authored Sep 2, 2021
2 parents 46f7df2 + 741a444 commit 5cbe879
Show file tree
Hide file tree
Showing 39 changed files with 2,267 additions and 1,866 deletions.
3 changes: 0 additions & 3 deletions http-ui/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,10 +343,7 @@ async fn main() -> anyhow::Result<()> {
update_builder.thread_pool(GLOBAL_THREAD_POOL.get().unwrap());
update_builder.log_every_n(indexer_opt_cloned.log_every_n);
update_builder.max_memory(indexer_opt_cloned.max_memory.get_bytes() as usize);
update_builder.linked_hash_map_size(indexer_opt_cloned.linked_hash_map_size);
update_builder.chunk_compression_type(indexer_opt_cloned.chunk_compression_type);
update_builder
.chunk_fusing_shrink_size(indexer_opt_cloned.chunk_fusing_shrink_size.get_bytes());

let before_update = Instant::now();
// we extract the update type and execute the update itself.
Expand Down
64 changes: 64 additions & 0 deletions infos/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,24 @@ enum Command {
word2: String,
},

/// Outputs a CSV with the proximities for the two specified words and
/// the documents ids where these relations appears.
///
/// `word1`, `prefix` defines the word pair specified *in this specific order*.
/// `proximity` defines the proximity between the two specified words.
/// `documents_ids` defines the documents ids where the relation appears.
WordPrefixPairProximitiesDocids {
/// Display the whole documents ids in details.
#[structopt(long)]
full_display: bool,

/// First word of the word pair.
word1: String,

/// Second word of the word pair.
prefix: String,
},

/// Outputs the words FST to standard output.
///
/// One can use the FST binary helper to dissect and analyze it,
Expand Down Expand Up @@ -282,6 +300,9 @@ fn main() -> anyhow::Result<()> {
WordPairProximitiesDocids { full_display, word1, word2 } => {
word_pair_proximities_docids(&index, &rtxn, !full_display, word1, word2)
}
WordPrefixPairProximitiesDocids { full_display, word1, prefix } => {
word_prefix_pair_proximities_docids(&index, &rtxn, !full_display, word1, prefix)
}
ExportWordsFst => export_words_fst(&index, &rtxn),
ExportWordsPrefixFst => export_words_prefix_fst(&index, &rtxn),
ExportDocuments { internal_documents_ids } => {
Expand Down Expand Up @@ -1131,3 +1152,46 @@ fn word_pair_proximities_docids(

Ok(wtr.flush()?)
}

fn word_prefix_pair_proximities_docids(
index: &Index,
rtxn: &heed::RoTxn,
debug: bool,
word1: String,
word_prefix: String,
) -> anyhow::Result<()> {
use heed::types::ByteSlice;
use milli::RoaringBitmapCodec;

let stdout = io::stdout();
let mut wtr = csv::Writer::from_writer(stdout.lock());
wtr.write_record(&["word1", "word_prefix", "proximity", "documents_ids"])?;

// Create the prefix key with only the pair of words.
let mut prefix = Vec::with_capacity(word1.len() + word_prefix.len() + 1);
prefix.extend_from_slice(word1.as_bytes());
prefix.push(0);
prefix.extend_from_slice(word_prefix.as_bytes());

let db = index.word_prefix_pair_proximity_docids.as_polymorph();
let iter = db.prefix_iter::<_, ByteSlice, RoaringBitmapCodec>(rtxn, &prefix)?;
for result in iter {
let (key, docids) = result?;

// Skip keys that are longer than the requested one,
// a longer key means that the second word is a prefix of the request word.
if key.len() != prefix.len() + 1 {
continue;
}

let proximity = key.last().unwrap();
let docids = if debug {
format!("{:?}", docids)
} else {
format!("{:?}", docids.iter().collect::<Vec<_>>())
};
wtr.write_record(&[&word1, &word_prefix, &proximity.to_string(), &docids])?;
}

Ok(wtr.flush()?)
}
3 changes: 2 additions & 1 deletion milli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,13 @@ bstr = "0.2.15"
byteorder = "1.4.2"
chrono = { version = "0.4.19", features = ["serde"] }
concat-arrays = "0.1.2"
crossbeam-channel = "0.5.1"
csv = "1.1.5"
either = "1.6.1"
flate2 = "1.0.20"
fst = "0.4.5"
fxhash = "0.2.1"
grenad = { git = "https://github.com/Kerollmops/grenad.git", rev = "3adcb26" }
grenad = "0.3.0"
heed = { git = "https://github.com/Kerollmops/heed", tag = "v0.12.1", default-features = false, features = ["lmdb", "sync-read-txn"] }
human_format = "1.0.3"
levenshtein_automata = { version = "0.2.0", features = ["fst_automaton"] }
Expand Down
52 changes: 31 additions & 21 deletions milli/src/heed_codec/facet/facet_string_level_zero_value_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,79 +2,89 @@ use std::borrow::Cow;
use std::convert::TryInto;
use std::{marker, str};

use super::try_split_at;
use crate::error::SerializationError;
use crate::heed_codec::RoaringBitmapCodec;
use crate::{try_split_array_at, try_split_at, Result};

/// A codec that encodes a string in front of the value.
pub type FacetStringLevelZeroValueCodec = StringValueCodec<RoaringBitmapCodec>;

/// A codec that encodes a string in front of a value.
///
/// The usecase is for the facet string levels algorithm where we must know the
/// original string of a normalized facet value, the original values are stored
/// in the value to not break the lexicographical ordering of the LMDB keys.
pub struct FacetStringLevelZeroValueCodec<C>(marker::PhantomData<C>);
pub struct StringValueCodec<C>(marker::PhantomData<C>);

impl<'a, C> heed::BytesDecode<'a> for FacetStringLevelZeroValueCodec<C>
impl<'a, C> heed::BytesDecode<'a> for StringValueCodec<C>
where
C: heed::BytesDecode<'a>,
{
type DItem = (&'a str, C::DItem);

fn bytes_decode(bytes: &'a [u8]) -> Option<Self::DItem> {
let (string_len, bytes) = try_split_at(bytes, 2)?;
let string_len = string_len.try_into().ok().map(u16::from_be_bytes)?;

let (string, bytes) = try_split_at(bytes, string_len as usize)?;
let string = str::from_utf8(string).ok()?;

let (string, bytes) = decode_prefix_string(bytes)?;
C::bytes_decode(bytes).map(|item| (string, item))
}
}

impl<'a, C> heed::BytesEncode<'a> for FacetStringLevelZeroValueCodec<C>
impl<'a, C> heed::BytesEncode<'a> for StringValueCodec<C>
where
C: heed::BytesEncode<'a>,
{
type EItem = (&'a str, C::EItem);

fn bytes_encode((string, value): &'a Self::EItem) -> Option<Cow<[u8]>> {
let string_len: u16 = string.len().try_into().ok()?;
let value_bytes = C::bytes_encode(&value)?;

let mut bytes = Vec::with_capacity(2 + string.len() + value_bytes.len());
bytes.extend_from_slice(&string_len.to_be_bytes());
bytes.extend_from_slice(string.as_bytes());
encode_prefix_string(string, &mut bytes).ok()?;
bytes.extend_from_slice(&value_bytes[..]);

Some(Cow::Owned(bytes))
}
}

pub fn decode_prefix_string(value: &[u8]) -> Option<(&str, &[u8])> {
let (original_length_bytes, bytes) = try_split_array_at(value)?;
let original_length = u16::from_be_bytes(original_length_bytes) as usize;
let (string, bytes) = try_split_at(bytes, original_length)?;
let string = str::from_utf8(string).ok()?;
Some((string, bytes))
}

pub fn encode_prefix_string(string: &str, buffer: &mut Vec<u8>) -> Result<()> {
let string_len: u16 =
string.len().try_into().map_err(|_| SerializationError::InvalidNumberSerialization)?;
buffer.extend_from_slice(&string_len.to_be_bytes());
buffer.extend_from_slice(string.as_bytes());
Ok(())
}

#[cfg(test)]
mod tests {
use heed::types::Unit;
use heed::{BytesDecode, BytesEncode};
use roaring::RoaringBitmap;

use super::*;
use crate::CboRoaringBitmapCodec;

#[test]
fn deserialize_roaring_bitmaps() {
let string = "abc";
let docids: RoaringBitmap = (0..100).chain(3500..4398).collect();
let key = (string, docids.clone());
let bytes =
FacetStringLevelZeroValueCodec::<CboRoaringBitmapCodec>::bytes_encode(&key).unwrap();
let bytes = StringValueCodec::<RoaringBitmapCodec>::bytes_encode(&key).unwrap();
let (out_string, out_docids) =
FacetStringLevelZeroValueCodec::<CboRoaringBitmapCodec>::bytes_decode(&bytes).unwrap();
StringValueCodec::<RoaringBitmapCodec>::bytes_decode(&bytes).unwrap();
assert_eq!((out_string, out_docids), (string, docids));
}

#[test]
fn deserialize_unit() {
let string = "def";
let key = (string, ());
let bytes = FacetStringLevelZeroValueCodec::<Unit>::bytes_encode(&key).unwrap();
let (out_string, out_unit) =
FacetStringLevelZeroValueCodec::<Unit>::bytes_decode(&bytes).unwrap();
let bytes = StringValueCodec::<Unit>::bytes_encode(&key).unwrap();
let (out_string, out_unit) = StringValueCodec::<Unit>::bytes_decode(&bytes).unwrap();
assert_eq!((out_string, out_unit), (string, ()));
}
}
4 changes: 3 additions & 1 deletion milli/src/heed_codec/facet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@ mod field_doc_id_facet_string_codec;
pub use self::facet_level_value_f64_codec::FacetLevelValueF64Codec;
pub use self::facet_level_value_u32_codec::FacetLevelValueU32Codec;
pub use self::facet_string_level_zero_codec::FacetStringLevelZeroCodec;
pub use self::facet_string_level_zero_value_codec::FacetStringLevelZeroValueCodec;
pub use self::facet_string_level_zero_value_codec::{
decode_prefix_string, encode_prefix_string, FacetStringLevelZeroValueCodec,
};
pub use self::facet_string_zero_bounds_value_codec::FacetStringZeroBoundsValueCodec;
pub use self::field_doc_id_facet_f64_codec::FieldDocIdFacetF64Codec;
pub use self::field_doc_id_facet_string_codec::FieldDocIdFacetStringCodec;
Expand Down
76 changes: 76 additions & 0 deletions milli/src/heed_codec/roaring_bitmap/cbo_roaring_bitmap_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,46 @@ impl CboRoaringBitmapCodec {
RoaringBitmap::deserialize_from(bytes)
}
}

/// Merge serialized CboRoaringBitmaps in a buffer.
///
/// if the merged values length is under the threshold, values are directly
/// serialized in the buffer else a RoaringBitmap is created from the
/// values and is serialized in the buffer.
pub fn merge_into(slices: &[Cow<[u8]>], buffer: &mut Vec<u8>) -> io::Result<()> {
let mut roaring = RoaringBitmap::new();
let mut vec = Vec::new();

for bytes in slices {
if bytes.len() <= THRESHOLD * size_of::<u32>() {
let mut reader = bytes.as_ref();
while let Ok(integer) = reader.read_u32::<NativeEndian>() {
vec.push(integer);
}
} else {
roaring |= RoaringBitmap::deserialize_from(bytes.as_ref())?;
}
}

if roaring.is_empty() {
vec.sort_unstable();
vec.dedup();

if vec.len() <= THRESHOLD {
for integer in vec {
buffer.extend_from_slice(&integer.to_ne_bytes());
}
} else {
let roaring = RoaringBitmap::from_sorted_iter(vec.into_iter());
roaring.serialize_into(buffer)?;
}
} else {
roaring.extend(vec);
roaring.serialize_into(buffer)?;
}

Ok(())
}
}

impl heed::BytesDecode<'_> for CboRoaringBitmapCodec {
Expand Down Expand Up @@ -106,4 +146,40 @@ mod tests {

assert!(roaring_size > bo_size);
}

#[test]
fn merge_cbo_roaring_bitmaps() {
let mut buffer = Vec::new();

let small_data = vec![
RoaringBitmap::from_sorted_iter(1..4),
RoaringBitmap::from_sorted_iter(2..5),
RoaringBitmap::from_sorted_iter(4..6),
RoaringBitmap::from_sorted_iter(1..3),
];

let small_data: Vec<_> =
small_data.iter().map(|b| CboRoaringBitmapCodec::bytes_encode(b).unwrap()).collect();
CboRoaringBitmapCodec::merge_into(small_data.as_slice(), &mut buffer).unwrap();
let bitmap = CboRoaringBitmapCodec::deserialize_from(&buffer).unwrap();
let expected = RoaringBitmap::from_sorted_iter(1..6);
assert_eq!(bitmap, expected);

let medium_data = vec![
RoaringBitmap::from_sorted_iter(1..4),
RoaringBitmap::from_sorted_iter(2..5),
RoaringBitmap::from_sorted_iter(4..8),
RoaringBitmap::from_sorted_iter(0..3),
RoaringBitmap::from_sorted_iter(7..23),
];

let medium_data: Vec<_> =
medium_data.iter().map(|b| CboRoaringBitmapCodec::bytes_encode(b).unwrap()).collect();
buffer.clear();
CboRoaringBitmapCodec::merge_into(medium_data.as_slice(), &mut buffer).unwrap();

let bitmap = CboRoaringBitmapCodec::deserialize_from(&buffer).unwrap();
let expected = RoaringBitmap::from_sorted_iter(0..23);
assert_eq!(bitmap, expected);
}
}
3 changes: 1 addition & 2 deletions milli/src/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,7 @@ pub struct Index {
/// Maps the facet field id, level and the number with the docids that corresponds to it.
pub facet_id_f64_docids: Database<FacetLevelValueF64Codec, CboRoaringBitmapCodec>,
/// Maps the facet field id and the string with the original string and docids that corresponds to it.
pub facet_id_string_docids:
Database<FacetStringLevelZeroCodec, FacetStringLevelZeroValueCodec<CboRoaringBitmapCodec>>,
pub facet_id_string_docids: Database<FacetStringLevelZeroCodec, FacetStringLevelZeroValueCodec>,

/// Maps the document id, the facet field id and the numbers.
pub field_id_docid_facet_f64s: Database<FieldDocIdFacetF64Codec, Unit>,
Expand Down
4 changes: 0 additions & 4 deletions milli/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@ mod search;
pub mod tree_level;
pub mod update;

use std::borrow::Cow;
use std::collections::{BTreeMap, HashMap};
use std::convert::{TryFrom, TryInto};
use std::hash::BuildHasherDefault;
use std::result::Result as StdResult;

use fxhash::{FxHasher32, FxHasher64};
pub use grenad::CompressionType;
Expand Down Expand Up @@ -54,8 +52,6 @@ pub type FieldId = u16;
pub type Position = u32;
pub type FieldDistribution = BTreeMap<String, u64>;

type MergeFn<E> = for<'a> fn(&[u8], &[Cow<'a, [u8]>]) -> StdResult<Vec<u8>, E>;

/// Transform a raw obkv store into a JSON Object.
pub fn obkv_to_json(
displayed_fields: &[FieldId],
Expand Down
4 changes: 2 additions & 2 deletions milli/src/proximity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ use std::cmp;

use crate::{Attribute, Position};

const ONE_ATTRIBUTE: u32 = 1000;
const MAX_DISTANCE: u32 = 8;
pub const ONE_ATTRIBUTE: u32 = 1000;
pub const MAX_DISTANCE: u32 = 8;

pub fn index_proximity(lhs: u32, rhs: u32) -> u32 {
if lhs <= rhs {
Expand Down
Loading

0 comments on commit 5cbe879

Please sign in to comment.