Skip to content

Commit

Permalink
Dump fulltext index (#2336)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Dump `MemoryIndexer` when the memory usage of the index exceeds the
threshold.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases

---------

Co-authored-by: writinwaters <[email protected]>
  • Loading branch information
vsian and writinwaters authored Dec 7, 2024
1 parent f1c2020 commit 25797ef
Show file tree
Hide file tree
Showing 22 changed files with 341 additions and 55 deletions.
2 changes: 2 additions & 0 deletions conf/pytest_parallel_infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ log_level = "trace"

[storage]
persistence_dir = "/var/infinity/persistence"
compact_interval = "10s"
cleanup_interval = "0s"

[buffer]
buffer_manager_size = "8GB"
Expand Down
110 changes: 109 additions & 1 deletion python/restart_test/test_memidx.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,117 @@ def check():

part3()

def test_mem_ivf_recover(self, infinity_runner : InfinityRunner):
def test_mem_indexer(self, infinity_runner : InfinityRunner):
config1 = "test/data/config/restart_test/test_memidx/1.toml"
config2 = "test/data/config/restart_test/test_memidx/2.toml"
config3 = "test/data/config/restart_test/test_memidx/3.toml"
uri = common_values.TEST_LOCAL_HOST
infinity_runner.clear()

decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner)

@decorator1
def part1(infinity_obj):
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.create_table(
"test_mem_indexer",
{"c1" : {"type" : "int"}, "c2": {"type": "varchar"}},
)
res = table_obj.create_index(
"idx1",
index.IndexInfo(
"c2",
index.IndexType.FullText,
),
)
assert res.error_code == infinity.ErrorCode.OK

table_obj.insert([
{"c1" : 1, "c2" : "this is a test text"},
{"c1" : 2, "c2" : "this is not a test text"},
])
# trigger the dump in 3rd record
table_obj.insert([
{"c1" : 3, "c2" : "this is indeed a test text"},
])
table_obj.insert([
{"c1" : 4, "c2" : "this is definitely not a test text"},
{"c1" : 5, "c2" : "this is nothing but a test text"},
])

part1()

# config1 can hold 2 rows of identical fulltext mem index before dump
# 1. recover by dumpindex wal & memindex recovery
decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner)

@decorator2
def part2(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_indexer")
data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [5]

data_dict, data_type_dict = (
table_obj.output(["c1"])
.match_text('c2', 'test text', 3)
.to_result()
)
# print(data_dict["c1"])
assert data_dict["c1"] == [1, 2, 3]

data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
# print(data_dict)
assert data_dict["count(star)"] == [5]

# the 2nd dump
table_obj.insert([
{"c1" : 6, "c2" : "this is the exact opposite of a test text"},
])
time.sleep(5)
table_obj.insert([
{"c1" : 7, "c2" : "what is this?"},
{"c1" : 8, "c2" : "this is what?"},
{"c1" : 9, "c2" : "not a test text!"},
{"c1" : 10, "c2" : "what a this?"},
{"c1" : 11, "c2" : "this is you!"},
])

part2()

# 2. recover by delta ckp & dumpindex wal & memindex recovery
decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner)

@decorator3
def part3(infinity_obj):
time.sleep(5)
db_obj = infinity_obj.get_database("default_db")
table_obj = db_obj.get_table("test_mem_indexer")

def check(rows):
data_dict, data_type_dict = (
table_obj.output(["c1"])
.match_text('c2', 'this what', 3)
.to_result()
)
# print(data_dict["c1"])
assert data_dict["c1"] == [7, 8, 10]

data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result()
assert data_dict["count(star)"] == [rows]

check(11)
table_obj.insert([
{"c1" : 12, "c2" : "this is a text!"},
])
check(12)

# the 3rd dump
db_obj.drop_table("test_mem_indexer")

part3()

def test_optimize_from_different_database(self, infinity_runner: InfinityRunner):
infinity_runner.clear()
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_index_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import segment_posting;
import index_segment_reader;
import posting_iterator;
import index_defines;
import memory_indexer;
// import memory_indexer;
import internal_types;
import segment_index_entry;
import chunk_index_entry;
Expand All @@ -32,6 +32,7 @@ namespace infinity {
struct TableEntry;
class TermDocIterator;
class Txn;
class MemoryIndexer;

export class ColumnIndexReader {
public:
Expand Down
33 changes: 22 additions & 11 deletions src/storage/invertedindex/column_inverter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ ColumnInverter::ColumnInverter(PostingWriterProvider posting_writer_provider, Ve

void ColumnInverter::InitAnalyzer(const String &analyzer_name) {
auto [analyzer, status] = AnalyzerPool::instance().GetAnalyzer(analyzer_name);
if(!status.ok()) {
if (!status.ok()) {
Status status = Status::UnexpectedError(fmt::format("Invalid analyzer: {}", analyzer_name));
RecoverableError(status);
}
Expand Down Expand Up @@ -203,11 +203,13 @@ void ColumnInverter::Sort() {
16);
}

void ColumnInverter::GeneratePosting() {
MemUsageChange ColumnInverter::GeneratePosting() {
u32 last_term_num = std::numeric_limits<u32>::max();
u32 last_doc_id = INVALID_DOCID;
StringRef last_term, term;
SharedPtr<PostingWriter> posting = nullptr;
MemUsageChange ret{true, 0};
Map<StringRef, PostingWriter *> modified_writers;
// printf("GeneratePosting() begin begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_);
for (auto &i : positions_) {
if (last_term_num != i.term_num_) {
Expand All @@ -218,6 +220,9 @@ void ColumnInverter::GeneratePosting() {
}
term = GetTermFromNum(i.term_num_);
posting = posting_writer_provider_(String(term.data()));
if (modified_writers.find(term) == modified_writers.end()) {
modified_writers[term] = posting.get();
}
// printf("\nswitched-term-%d-<%s>\n", i.term_num_, term.data());
if (last_term_num != (u32)(-1)) {
assert(last_term_num < i.term_num_);
Expand All @@ -242,6 +247,12 @@ void ColumnInverter::GeneratePosting() {
// printf(" EndDocument3-%u\n", last_doc_id);
}
// printf("GeneratePosting() end begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_);
for (auto kv : modified_writers) {
PostingWriter *writer = kv.second;
ret.Add(writer->GetSizeChange());
}
LOG_TRACE(fmt::format("MemUsageChange : {}, {}", ret.is_add_, ret.mem_));
return ret;
}

void ColumnInverter::SortForOfflineDump() {
Expand All @@ -258,7 +269,7 @@ void ColumnInverter::SortForOfflineDump() {
// ----------------------------------------------------------------------------------------------------------------------------+
// Data within each group

void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer) {
void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer) {
// spill sort results for external merge sort
// if (positions_.empty()) {
// return;
Expand All @@ -267,19 +278,19 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
// size of this Run in bytes
u32 data_size = 0;
u64 data_size_pos = spill_file_tell;
buf_writer->Write((const char*)&data_size, sizeof(u32));
buf_writer->Write((const char *)&data_size, sizeof(u32));
spill_file_tell += sizeof(u32);

// number of tuples
u32 num_of_tuples = positions_.size();
tuple_count += num_of_tuples;
buf_writer->Write((const char*)&num_of_tuples, sizeof(u32));
buf_writer->Write((const char *)&num_of_tuples, sizeof(u32));
spill_file_tell += sizeof(u32);

// start offset for next spill
u64 next_start_offset = 0;
u64 next_start_offset_pos = spill_file_tell;
buf_writer->Write((const char*)&next_start_offset, sizeof(u64));
buf_writer->Write((const char *)&next_start_offset, sizeof(u64));
spill_file_tell += sizeof(u64);

u64 data_start_offset = spill_file_tell;
Expand All @@ -295,11 +306,11 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
}
record_length = term.size() + sizeof(docid_t) + sizeof(u32) + 1;

buf_writer->Write((const char*)&record_length, sizeof(u32));
buf_writer->Write((const char *)&record_length, sizeof(u32));
buf_writer->Write(term.data(), term.size());
buf_writer->Write((const char*)&str_null, sizeof(char));
buf_writer->Write((const char*)&(i.doc_id_), sizeof(docid_t));
buf_writer->Write((const char*)&(i.term_pos_), sizeof(u32));
buf_writer->Write((const char *)&str_null, sizeof(char));
buf_writer->Write((const char *)&(i.doc_id_), sizeof(docid_t));
buf_writer->Write((const char *)&(i.term_pos_), sizeof(u32));
}
buf_writer->Flush();
// update data size
Expand All @@ -312,4 +323,4 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique
fseek(spill_file, next_start_offset, SEEK_SET);
}

} // namespace infinity
} // namespace infinity
5 changes: 3 additions & 2 deletions src/storage/invertedindex/column_inverter.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import internal_types;
import posting_writer;
import vector_with_lock;
import buf_writer;
import mem_usage_change;

namespace infinity {

Expand All @@ -52,7 +53,7 @@ public:

void Sort();

void GeneratePosting();
MemUsageChange GeneratePosting();

u32 GetDocCount() { return doc_count_; }

Expand All @@ -74,7 +75,7 @@ public:
}
};

void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter>& buf_writer);
void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr<BufWriter> &buf_writer);

private:
using TermBuffer = Vector<char>;
Expand Down
4 changes: 3 additions & 1 deletion src/storage/invertedindex/format/doc_list_encoder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ public:

PostingByteSlice *GetDocListBuffer() { return &doc_list_buffer_; }

inline SizeT GetSizeInBytes() const { return doc_list_buffer_.GetSizeInBytes() + doc_skiplist_writer_->GetSizeInBytes(); }

private:
void AddDocument(docid_t doc_id, docpayload_t doc_payload, tf_t tf, u32 doc_len);

Expand Down Expand Up @@ -78,4 +80,4 @@ private:
friend class InMemDocListDecoderTest;
};

} // namespace infinity
} // namespace infinity
11 changes: 6 additions & 5 deletions src/storage/invertedindex/format/position_list_encoder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ namespace infinity {

export class PositionListEncoder {
public:
PositionListEncoder(const PostingFormatOption &format_option,
const PositionListFormat *pos_list_format = nullptr);
PositionListEncoder(const PostingFormatOption &format_option, const PositionListFormat *pos_list_format = nullptr);

~PositionListEncoder();

Expand All @@ -38,17 +37,19 @@ public:

const PositionListFormat *GetPositionListFormat() const { return pos_list_format_; }

inline SizeT GetSizeInBytes() const { return pos_list_buffer_.GetSizeInBytes() + pos_skiplist_writer_->GetSizeInBytes(); }

private:
void CreatePosSkipListWriter();
void AddPosSkipListItem(u32 total_pos_count, u32 compressed_pos_size, bool need_flush);
void FlushPositionBuffer();

private:
PostingByteSlice pos_list_buffer_;
pos_t last_pos_in_cur_doc_; // 4byte
u32 total_pos_count_; // 4byte
pos_t last_pos_in_cur_doc_; // 4byte
u32 total_pos_count_; // 4byte
PostingFormatOption format_option_;
bool is_own_format_; // 1byte
bool is_own_format_; // 1byte
UniquePtr<SkipListWriter> pos_skiplist_writer_;
const PositionListFormat *pos_list_format_;
};
Expand Down
2 changes: 2 additions & 0 deletions src/storage/invertedindex/format/posting_buffer.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public:

u8 Size() const { return size_; }

inline SizeT GetSizeInBytes() const { return capacity_ * posting_fields_->GetTotalSize(); }

u8 GetRowCount() const { return posting_fields_->GetSize(); }

template <typename T>
Expand Down
4 changes: 3 additions & 1 deletion src/storage/invertedindex/format/posting_byte_slice.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ public:

SizeT EstimateDumpSize() const { return posting_writer_.GetSize(); }

inline SizeT GetSizeInBytes() const { return buffer_.GetSizeInBytes() + posting_writer_.GetSize(); }

protected:
SizeT DoFlush();

Expand All @@ -71,4 +73,4 @@ inline void PostingByteSlice::PushBack(u8 row, T value) {
buffer_.PushBack(row, value);
}

} // namespace infinity
} // namespace infinity
Loading

0 comments on commit 25797ef

Please sign in to comment.