Skip to content

Commit

Permalink
increase min memory to 15MB for indexing (#2176)
Browse files Browse the repository at this point in the history
With tantivy 0.20 the minimum memory consumption per SegmentWriter increased to
12MB. 7MB are for the different fast field collectors types (they could be
lazily created). Increase the minimum memory from 3MB to 15MB.

Change memory variable naming from arena to budget.

closes #2156
  • Loading branch information
PSeitz authored Sep 13, 2023
1 parent 03fcdce commit 2d73903
Show file tree
Hide file tree
Showing 19 changed files with 84 additions and 62 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ members = ["query-grammar", "bitpacker", "common", "ownedbytes", "stacker", "sst
[[test]]
name = "failpoints"
path = "tests/failpoints/mod.rs"
required-features = ["fail/failpoints"]
required-features = ["failpoints"]

[[bench]]
name = "analyzer"
Expand Down
2 changes: 1 addition & 1 deletion src/collector/count_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::{DocId, Score, SegmentOrdinal, SegmentReader};
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
///
/// let mut index_writer = index.writer(3_000_000).unwrap();
/// let mut index_writer = index.writer(15_000_000).unwrap();
/// index_writer.add_document(doc!(title => "The Name of the Wind")).unwrap();
/// index_writer.add_document(doc!(title => "The Diary of Muadib")).unwrap();
/// index_writer.add_document(doc!(title => "A Dairy Cow")).unwrap();
Expand Down
2 changes: 1 addition & 1 deletion src/collector/facet_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ fn facet_depth(facet_bytes: &[u8]) -> usize {
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// {
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// // a document can be associated with any number of facets
/// index_writer.add_document(doc!(
/// title => "The Name of the Wind",
Expand Down
6 changes: 3 additions & 3 deletions src/collector/histogram_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ mod tests {
let val_field = schema_builder.add_i64_field("val_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(val_field=>12i64))?;
writer.add_document(doc!(val_field=>-30i64))?;
writer.add_document(doc!(val_field=>-12i64))?;
Expand All @@ -255,7 +255,7 @@ mod tests {
let val_field = schema_builder.add_i64_field("val_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(val_field=>12i64))?;
writer.commit()?;
writer.add_document(doc!(val_field=>-30i64))?;
Expand All @@ -280,7 +280,7 @@ mod tests {
let date_field = schema_builder.add_date_field("date_field", FAST);
let schema = schema_builder.build();
let index = Index::create_in_ram(schema);
let mut writer = index.writer_with_num_threads(1, 4_000_000)?;
let mut writer = index.writer_for_tests()?;
writer.add_document(doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1982, Month::September, 17)?.with_hms(0, 0, 0)?)))?;
writer.add_document(
doc!(date_field=>DateTime::from_primitive(Date::from_calendar_date(1986, Month::March, 9)?.with_hms(0, 0, 0)?)),
Expand Down
2 changes: 1 addition & 1 deletion src/collector/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
//! # let title = schema_builder.add_text_field("title", TEXT);
//! # let schema = schema_builder.build();
//! # let index = Index::create_in_ram(schema);
//! # let mut index_writer = index.writer(3_000_000)?;
//! # let mut index_writer = index.writer(15_000_000)?;
//! # index_writer.add_document(doc!(
//! # title => "The Name of the Wind",
//! # ))?;
Expand Down
2 changes: 1 addition & 1 deletion src/collector/multi_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ impl<TFruit: Fruit> FruitHandle<TFruit> {
/// let title = schema_builder.add_text_field("title", TEXT);
/// let schema = schema_builder.build();
/// let index = Index::create_in_ram(schema);
/// let mut index_writer = index.writer(3_000_000)?;
/// let mut index_writer = index.writer(15_000_000)?;
/// index_writer.add_document(doc!(title => "The Name of the Wind"))?;
/// index_writer.add_document(doc!(title => "The Diary of Muadib"))?;
/// index_writer.add_document(doc!(title => "A Dairy Cow"))?;
Expand Down
22 changes: 11 additions & 11 deletions src/core/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use crate::directory::error::OpenReadError;
use crate::directory::MmapDirectory;
use crate::directory::{Directory, ManagedDirectory, RamDirectory, INDEX_WRITER_LOCK};
use crate::error::{DataCorruption, TantivyError};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_ARENA_NUM_BYTES_MIN};
use crate::indexer::index_writer::{MAX_NUM_THREAD, MEMORY_BUDGET_NUM_BYTES_MIN};
use crate::indexer::segment_updater::save_metas;
use crate::reader::{IndexReader, IndexReaderBuilder};
use crate::schema::{Field, FieldType, Schema};
Expand Down Expand Up @@ -523,9 +523,9 @@ impl Index {
/// - `num_threads` defines the number of indexing workers that
/// should work at the same time.
///
/// - `overall_memory_arena_in_bytes` sets the amount of memory
/// - `overall_memory_budget_in_bytes` sets the amount of memory
/// allocated for all indexing thread.
/// Each thread will receive a budget of `overall_memory_arena_in_bytes / num_threads`.
/// Each thread will receive a budget of `overall_memory_budget_in_bytes / num_threads`.
///
/// # Errors
/// If the lockfile already exists, returns `Error::DirectoryLockBusy` or an `Error::IoError`.
Expand All @@ -534,7 +534,7 @@ impl Index {
pub fn writer_with_num_threads(
&self,
num_threads: usize,
overall_memory_arena_in_bytes: usize,
overall_memory_budget_in_bytes: usize,
) -> crate::Result<IndexWriter> {
let directory_lock = self
.directory
Expand All @@ -550,7 +550,7 @@ impl Index {
),
)
})?;
let memory_arena_in_bytes_per_thread = overall_memory_arena_in_bytes / num_threads;
let memory_arena_in_bytes_per_thread = overall_memory_budget_in_bytes / num_threads;
IndexWriter::new(
self,
num_threads,
Expand All @@ -561,7 +561,7 @@ impl Index {

/// Helper to create an index writer for tests.
///
/// That index writer only simply has a single thread and a memory arena of 10 MB.
/// That index writer only simply has a single thread and a memory budget of 15 MB.
/// Using a single thread gives us a deterministic allocation of DocId.
#[cfg(test)]
pub fn writer_for_tests(&self) -> crate::Result<IndexWriter> {
Expand All @@ -579,13 +579,13 @@ impl Index {
/// If the lockfile already exists, returns `Error::FileAlreadyExists`.
/// If the memory arena per thread is too small or too big, returns
/// `TantivyError::InvalidArgument`
pub fn writer(&self, memory_arena_num_bytes: usize) -> crate::Result<IndexWriter> {
pub fn writer(&self, memory_budget_in_bytes: usize) -> crate::Result<IndexWriter> {
let mut num_threads = std::cmp::min(num_cpus::get(), MAX_NUM_THREAD);
let memory_arena_num_bytes_per_thread = memory_arena_num_bytes / num_threads;
if memory_arena_num_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
num_threads = (memory_arena_num_bytes / MEMORY_ARENA_NUM_BYTES_MIN).max(1);
let memory_budget_num_bytes_per_thread = memory_budget_in_bytes / num_threads;
if memory_budget_num_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
num_threads = (memory_budget_in_bytes / MEMORY_BUDGET_NUM_BYTES_MIN).max(1);
}
self.writer_with_num_threads(num_threads, memory_arena_num_bytes)
self.writer_with_num_threads(num_threads, memory_budget_in_bytes)
}

/// Accessor to the index settings
Expand Down
3 changes: 2 additions & 1 deletion src/functional_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::HashSet;

use rand::{thread_rng, Rng};

use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::schema::*;
use crate::{doc, schema, Index, IndexSettings, IndexSortByField, Order, Searcher};

Expand Down Expand Up @@ -30,7 +31,7 @@ fn test_functional_store() -> crate::Result<()> {

let mut rng = thread_rng();

let mut index_writer = index.writer_with_num_threads(3, 12_000_000)?;
let mut index_writer = index.writer_with_num_threads(3, MEMORY_BUDGET_NUM_BYTES_MIN)?;

let mut doc_set: Vec<u64> = Vec::new();

Expand Down
73 changes: 44 additions & 29 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ use crate::{FutureResult, Opstamp};
// in the `memory_arena` goes below MARGIN_IN_BYTES.
pub const MARGIN_IN_BYTES: usize = 1_000_000;

// We impose the memory per thread to be at least 3 MB.
pub const MEMORY_ARENA_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 3u32) as usize;
pub const MEMORY_ARENA_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;
// We impose the memory per thread to be at least 15 MB, as the baseline consumption is 12MB.
pub const MEMORY_BUDGET_NUM_BYTES_MIN: usize = ((MARGIN_IN_BYTES as u32) * 15u32) as usize;
pub const MEMORY_BUDGET_NUM_BYTES_MAX: usize = u32::MAX as usize - MARGIN_IN_BYTES;

// We impose the number of index writer threads to be at most this.
pub const MAX_NUM_THREAD: usize = 8;
Expand Down Expand Up @@ -57,7 +57,8 @@ pub struct IndexWriter {

index: Index,

memory_arena_in_bytes_per_thread: usize,
// The memory budget per thread, after which a commit is triggered.
memory_budget_in_bytes_per_thread: usize,

workers_join_handle: Vec<JoinHandle<crate::Result<()>>>,

Expand Down Expand Up @@ -264,19 +265,19 @@ impl IndexWriter {
pub(crate) fn new(
index: &Index,
num_threads: usize,
memory_arena_in_bytes_per_thread: usize,
memory_budget_in_bytes_per_thread: usize,
directory_lock: DirectoryLock,
) -> crate::Result<IndexWriter> {
if memory_arena_in_bytes_per_thread < MEMORY_ARENA_NUM_BYTES_MIN {
if memory_budget_in_bytes_per_thread < MEMORY_BUDGET_NUM_BYTES_MIN {
let err_msg = format!(
"The memory arena in bytes per thread needs to be at least \
{MEMORY_ARENA_NUM_BYTES_MIN}."
{MEMORY_BUDGET_NUM_BYTES_MIN}."
);
return Err(TantivyError::InvalidArgument(err_msg));
}
if memory_arena_in_bytes_per_thread >= MEMORY_ARENA_NUM_BYTES_MAX {
if memory_budget_in_bytes_per_thread >= MEMORY_BUDGET_NUM_BYTES_MAX {
let err_msg = format!(
"The memory arena in bytes per thread cannot exceed {MEMORY_ARENA_NUM_BYTES_MAX}"
"The memory arena in bytes per thread cannot exceed {MEMORY_BUDGET_NUM_BYTES_MAX}"
);
return Err(TantivyError::InvalidArgument(err_msg));
}
Expand All @@ -295,7 +296,7 @@ impl IndexWriter {
let mut index_writer = IndexWriter {
_directory_lock: Some(directory_lock),

memory_arena_in_bytes_per_thread,
memory_budget_in_bytes_per_thread,
index: index.clone(),
index_writer_status: IndexWriterStatus::from(document_receiver),
operation_sender: document_sender,
Expand Down Expand Up @@ -396,7 +397,7 @@ impl IndexWriter {

let mut delete_cursor = self.delete_queue.cursor();

let mem_budget = self.memory_arena_in_bytes_per_thread;
let mem_budget = self.memory_budget_in_bytes_per_thread;
let index = self.index.clone();
let join_handle: JoinHandle<crate::Result<()>> = thread::Builder::new()
.name(format!("thrd-tantivy-index{}", self.worker_id))
Expand Down Expand Up @@ -554,7 +555,7 @@ impl IndexWriter {
let new_index_writer: IndexWriter = IndexWriter::new(
&self.index,
self.num_threads,
self.memory_arena_in_bytes_per_thread,
self.memory_budget_in_bytes_per_thread,
directory_lock,
)?;

Expand Down Expand Up @@ -810,6 +811,7 @@ mod tests {
use crate::collector::TopDocs;
use crate::directory::error::LockError;
use crate::error::*;
use crate::indexer::index_writer::MEMORY_BUDGET_NUM_BYTES_MIN;
use crate::indexer::NoMergePolicy;
use crate::query::{BooleanQuery, Occur, Query, QueryParser, TermQuery};
use crate::schema::{
Expand Down Expand Up @@ -941,7 +943,7 @@ mod tests {
fn test_empty_operations_group() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let index_writer = index.writer_for_tests().unwrap();
let operations1 = vec![];
let batch_opstamp1 = index_writer.run(operations1).unwrap();
assert_eq!(batch_opstamp1, 0u64);
Expand All @@ -954,8 +956,8 @@ mod tests {
fn test_lockfile_stops_duplicates() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let _index_writer = index.writer(3_000_000).unwrap();
match index.writer(3_000_000) {
let _index_writer = index.writer_for_tests().unwrap();
match index.writer_for_tests() {
Err(TantivyError::LockFailure(LockError::LockBusy, _)) => {}
_ => panic!("Expected a `LockFailure` error"),
}
Expand All @@ -979,7 +981,7 @@ mod tests {
fn test_set_merge_policy() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let index_writer = index.writer(3_000_000).unwrap();
let index_writer = index.writer_for_tests().unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, \
Expand All @@ -998,11 +1000,11 @@ mod tests {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
{
let _index_writer = index.writer(3_000_000).unwrap();
let _index_writer = index.writer_for_tests().unwrap();
// the lock should be released when the
// index_writer leaves the scope.
}
let _index_writer_two = index.writer(3_000_000).unwrap();
let _index_writer_two = index.writer_for_tests().unwrap();
}

#[test]
Expand All @@ -1022,7 +1024,7 @@ mod tests {

{
// writing the segment
let mut index_writer = index.writer(3_000_000)?;
let mut index_writer = index.writer_for_tests()?;
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.rollback()?;
assert_eq!(index_writer.commit_opstamp(), 0u64);
Expand Down Expand Up @@ -1054,7 +1056,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.commit()?;
// this should create 1 segment
Expand Down Expand Up @@ -1094,7 +1096,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer_for_tests().unwrap();
index_writer.add_document(doc!(text_field=>"a"))?;
index_writer.commit()?;
index_writer.add_document(doc!(text_field=>"a"))?;
Expand Down Expand Up @@ -1140,7 +1142,7 @@ mod tests {
reader.searcher().doc_freq(&term_a).unwrap()
};
// writing the segment
let mut index_writer = index.writer(12_000_000).unwrap();
let mut index_writer = index.writer(MEMORY_BUDGET_NUM_BYTES_MIN).unwrap();
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field=>"a"))?;
Expand Down Expand Up @@ -1196,7 +1198,8 @@ mod tests {

{
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000)?;
let mut index_writer =
index.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)?;
// create 8 segments with 100 tiny docs
for _doc in 0..100 {
index_writer.add_document(doc!(text_field => "a"))?;
Expand Down Expand Up @@ -1245,7 +1248,9 @@ mod tests {
let term = Term::from_field_text(text_field, s);
searcher.doc_freq(&term).unwrap()
};
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();

let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();
let commit_tstamp = index_writer.commit().unwrap();
Expand All @@ -1262,7 +1267,9 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();

let add_tstamp = index_writer.add_document(doc!(text_field => "a")).unwrap();

Expand Down Expand Up @@ -1311,7 +1318,9 @@ mod tests {
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
// writing the segment
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let res = index_writer.delete_all_documents();
assert!(res.is_ok());

Expand All @@ -1338,7 +1347,9 @@ mod tests {
let mut schema_builder = schema::Schema::builder();
let text_field = schema_builder.add_text_field("text", TEXT);
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();

// add one simple doc
assert!(index_writer.add_document(doc!(text_field => "a")).is_ok());
Expand Down Expand Up @@ -1371,7 +1382,9 @@ mod tests {
fn test_delete_all_documents_empty_index() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let clear = index_writer.delete_all_documents();
let commit = index_writer.commit();
assert!(clear.is_ok());
Expand All @@ -1382,7 +1395,9 @@ mod tests {
fn test_delete_all_documents_index_twice() {
let schema_builder = schema::Schema::builder();
let index = Index::create_in_ram(schema_builder.build());
let mut index_writer = index.writer_with_num_threads(4, 12_000_000).unwrap();
let mut index_writer = index
.writer_with_num_threads(4, MEMORY_BUDGET_NUM_BYTES_MIN * 4)
.unwrap();
let clear = index_writer.delete_all_documents();
let commit = index_writer.commit();
assert!(clear.is_ok());
Expand Down
Loading

0 comments on commit 2d73903

Please sign in to comment.