diff --git a/conf/pytest_parallel_infinity_conf.toml b/conf/pytest_parallel_infinity_conf.toml index bc5c9822e2..d4d6fd1a23 100644 --- a/conf/pytest_parallel_infinity_conf.toml +++ b/conf/pytest_parallel_infinity_conf.toml @@ -16,6 +16,8 @@ log_level = "trace" [storage] persistence_dir = "/var/infinity/persistence" +compact_interval = "10s" +cleanup_interval = "0s" [buffer] buffer_manager_size = "8GB" diff --git a/python/restart_test/test_memidx.py b/python/restart_test/test_memidx.py index 18e6142060..0a3d519374 100644 --- a/python/restart_test/test_memidx.py +++ b/python/restart_test/test_memidx.py @@ -227,9 +227,117 @@ def check(): part3() - def test_mem_ivf_recover(self, infinity_runner : InfinityRunner): + def test_mem_indexer(self, infinity_runner : InfinityRunner): + config1 = "test/data/config/restart_test/test_memidx/1.toml" + config2 = "test/data/config/restart_test/test_memidx/2.toml" + config3 = "test/data/config/restart_test/test_memidx/3.toml" + uri = common_values.TEST_LOCAL_HOST infinity_runner.clear() + decorator1 = infinity_runner_decorator_factory(config1, uri, infinity_runner) + + @decorator1 + def part1(infinity_obj): + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.create_table( + "test_mem_indexer", + {"c1" : {"type" : "int"}, "c2": {"type": "varchar"}}, + ) + res = table_obj.create_index( + "idx1", + index.IndexInfo( + "c2", + index.IndexType.FullText, + ), + ) + assert res.error_code == infinity.ErrorCode.OK + + table_obj.insert([ + {"c1" : 1, "c2" : "this is a test text"}, + {"c1" : 2, "c2" : "this is not a test text"}, + ]) + # trigger the dump in 3rd record + table_obj.insert([ + {"c1" : 3, "c2" : "this is indeed a test text"}, + ]) + table_obj.insert([ + {"c1" : 4, "c2" : "this is definitely not a test text"}, + {"c1" : 5, "c2" : "this is nothing but a test text"}, + ]) + + part1() + + # config1 can hold 2 rows of identical fulltext mem index before dump + # 1. recover by dumpindex wal & memindex recovery + decorator2 = infinity_runner_decorator_factory(config2, uri, infinity_runner) + + @decorator2 + def part2(infinity_obj): + time.sleep(5) + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.get_table("test_mem_indexer") + data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result() + # print(data_dict) + assert data_dict["count(star)"] == [5] + + data_dict, data_type_dict = ( + table_obj.output(["c1"]) + .match_text('c2', 'test text', 3) + .to_result() + ) + # print(data_dict["c1"]) + assert data_dict["c1"] == [1, 2, 3] + + data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result() + # print(data_dict) + assert data_dict["count(star)"] == [5] + + # the 2nd dump + table_obj.insert([ + {"c1" : 6, "c2" : "this is the exact opposite of a test text"}, + ]) + time.sleep(5) + table_obj.insert([ + {"c1" : 7, "c2" : "what is this?"}, + {"c1" : 8, "c2" : "this is what?"}, + {"c1" : 9, "c2" : "not a test text!"}, + {"c1" : 10, "c2" : "what a this?"}, + {"c1" : 11, "c2" : "this is you!"}, + ]) + + part2() + + # 2. recover by delta ckp & dumpindex wal & memindex recovery + decorator3 = infinity_runner_decorator_factory(config3, uri, infinity_runner) + + @decorator3 + def part3(infinity_obj): + time.sleep(5) + db_obj = infinity_obj.get_database("default_db") + table_obj = db_obj.get_table("test_mem_indexer") + + def check(rows): + data_dict, data_type_dict = ( + table_obj.output(["c1"]) + .match_text('c2', 'this what', 3) + .to_result() + ) + # print(data_dict["c1"]) + assert data_dict["c1"] == [7, 8, 10] + + data_dict, data_type_dict = table_obj.output(["count(*)"]).to_result() + assert data_dict["count(star)"] == [rows] + + check(11) + table_obj.insert([ + {"c1" : 12, "c2" : "this is a text!"}, + ]) + check(12) + + # the 3rd dump + db_obj.drop_table("test_mem_indexer") + + part3() def test_optimize_from_different_database(self, infinity_runner: InfinityRunner): infinity_runner.clear() diff --git a/src/storage/invertedindex/column_index_reader.cppm b/src/storage/invertedindex/column_index_reader.cppm index d70f319ff7..70c0d82c04 100644 --- a/src/storage/invertedindex/column_index_reader.cppm +++ b/src/storage/invertedindex/column_index_reader.cppm @@ -22,7 +22,7 @@ import segment_posting; import index_segment_reader; import posting_iterator; import index_defines; -import memory_indexer; +// import memory_indexer; import internal_types; import segment_index_entry; import chunk_index_entry; @@ -32,6 +32,7 @@ namespace infinity { struct TableEntry; class TermDocIterator; class Txn; +class MemoryIndexer; export class ColumnIndexReader { public: diff --git a/src/storage/invertedindex/column_inverter.cpp b/src/storage/invertedindex/column_inverter.cpp index f85a449d73..5654dc8526 100644 --- a/src/storage/invertedindex/column_inverter.cpp +++ b/src/storage/invertedindex/column_inverter.cpp @@ -52,7 +52,7 @@ ColumnInverter::ColumnInverter(PostingWriterProvider posting_writer_provider, Ve void ColumnInverter::InitAnalyzer(const String &analyzer_name) { auto [analyzer, status] = AnalyzerPool::instance().GetAnalyzer(analyzer_name); - if(!status.ok()) { + if (!status.ok()) { Status status = Status::UnexpectedError(fmt::format("Invalid analyzer: {}", analyzer_name)); RecoverableError(status); } @@ -203,11 +203,13 @@ void ColumnInverter::Sort() { 16); } -void ColumnInverter::GeneratePosting() { +MemUsageChange ColumnInverter::GeneratePosting() { u32 last_term_num = std::numeric_limits::max(); u32 last_doc_id = INVALID_DOCID; StringRef last_term, term; SharedPtr posting = nullptr; + MemUsageChange ret{true, 0}; + Map modified_writers; // printf("GeneratePosting() begin begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_); for (auto &i : positions_) { if (last_term_num != i.term_num_) { @@ -218,6 +220,9 @@ void ColumnInverter::GeneratePosting() { } term = GetTermFromNum(i.term_num_); posting = posting_writer_provider_(String(term.data())); + if (modified_writers.find(term) == modified_writers.end()) { + modified_writers[term] = posting.get(); + } // printf("\nswitched-term-%d-<%s>\n", i.term_num_, term.data()); if (last_term_num != (u32)(-1)) { assert(last_term_num < i.term_num_); @@ -242,6 +247,12 @@ void ColumnInverter::GeneratePosting() { // printf(" EndDocument3-%u\n", last_doc_id); } // printf("GeneratePosting() end begin_doc_id_ %u, doc_count_ %u, merged_ %u", begin_doc_id_, doc_count_, merged_); + for (auto kv : modified_writers) { + PostingWriter *writer = kv.second; + ret.Add(writer->GetSizeChange()); + } + LOG_TRACE(fmt::format("MemUsageChange : {}, {}", ret.is_add_, ret.mem_)); + return ret; } void ColumnInverter::SortForOfflineDump() { @@ -258,7 +269,7 @@ void ColumnInverter::SortForOfflineDump() { // ----------------------------------------------------------------------------------------------------------------------------+ // Data within each group -void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr& buf_writer) { +void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr &buf_writer) { // spill sort results for external merge sort // if (positions_.empty()) { // return; @@ -267,19 +278,19 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique // size of this Run in bytes u32 data_size = 0; u64 data_size_pos = spill_file_tell; - buf_writer->Write((const char*)&data_size, sizeof(u32)); + buf_writer->Write((const char *)&data_size, sizeof(u32)); spill_file_tell += sizeof(u32); // number of tuples u32 num_of_tuples = positions_.size(); tuple_count += num_of_tuples; - buf_writer->Write((const char*)&num_of_tuples, sizeof(u32)); + buf_writer->Write((const char *)&num_of_tuples, sizeof(u32)); spill_file_tell += sizeof(u32); // start offset for next spill u64 next_start_offset = 0; u64 next_start_offset_pos = spill_file_tell; - buf_writer->Write((const char*)&next_start_offset, sizeof(u64)); + buf_writer->Write((const char *)&next_start_offset, sizeof(u64)); spill_file_tell += sizeof(u64); u64 data_start_offset = spill_file_tell; @@ -295,11 +306,11 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique } record_length = term.size() + sizeof(docid_t) + sizeof(u32) + 1; - buf_writer->Write((const char*)&record_length, sizeof(u32)); + buf_writer->Write((const char *)&record_length, sizeof(u32)); buf_writer->Write(term.data(), term.size()); - buf_writer->Write((const char*)&str_null, sizeof(char)); - buf_writer->Write((const char*)&(i.doc_id_), sizeof(docid_t)); - buf_writer->Write((const char*)&(i.term_pos_), sizeof(u32)); + buf_writer->Write((const char *)&str_null, sizeof(char)); + buf_writer->Write((const char *)&(i.doc_id_), sizeof(docid_t)); + buf_writer->Write((const char *)&(i.term_pos_), sizeof(u32)); } buf_writer->Flush(); // update data size @@ -312,4 +323,4 @@ void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count, Unique fseek(spill_file, next_start_offset, SEEK_SET); } -} // namespace infinity \ No newline at end of file +} // namespace infinity diff --git a/src/storage/invertedindex/column_inverter.cppm b/src/storage/invertedindex/column_inverter.cppm index 253bdc46d2..377ac43887 100644 --- a/src/storage/invertedindex/column_inverter.cppm +++ b/src/storage/invertedindex/column_inverter.cppm @@ -28,6 +28,7 @@ import internal_types; import posting_writer; import vector_with_lock; import buf_writer; +import mem_usage_change; namespace infinity { @@ -52,7 +53,7 @@ public: void Sort(); - void GeneratePosting(); + MemUsageChange GeneratePosting(); u32 GetDocCount() { return doc_count_; } @@ -74,7 +75,7 @@ public: } }; - void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr& buf_writer); + void SpillSortResults(FILE *spill_file, u64 &tuple_count, UniquePtr &buf_writer); private: using TermBuffer = Vector; diff --git a/src/storage/invertedindex/format/doc_list_encoder.cppm b/src/storage/invertedindex/format/doc_list_encoder.cppm index bf0f36d109..b7d2652c03 100644 --- a/src/storage/invertedindex/format/doc_list_encoder.cppm +++ b/src/storage/invertedindex/format/doc_list_encoder.cppm @@ -51,6 +51,8 @@ public: PostingByteSlice *GetDocListBuffer() { return &doc_list_buffer_; } + inline SizeT GetSizeInBytes() const { return doc_list_buffer_.GetSizeInBytes() + doc_skiplist_writer_->GetSizeInBytes(); } + private: void AddDocument(docid_t doc_id, docpayload_t doc_payload, tf_t tf, u32 doc_len); @@ -78,4 +80,4 @@ private: friend class InMemDocListDecoderTest; }; -} // namespace infinity \ No newline at end of file +} // namespace infinity diff --git a/src/storage/invertedindex/format/position_list_encoder.cppm b/src/storage/invertedindex/format/position_list_encoder.cppm index e35484aa42..781509ef78 100644 --- a/src/storage/invertedindex/format/position_list_encoder.cppm +++ b/src/storage/invertedindex/format/position_list_encoder.cppm @@ -18,8 +18,7 @@ namespace infinity { export class PositionListEncoder { public: - PositionListEncoder(const PostingFormatOption &format_option, - const PositionListFormat *pos_list_format = nullptr); + PositionListEncoder(const PostingFormatOption &format_option, const PositionListFormat *pos_list_format = nullptr); ~PositionListEncoder(); @@ -38,6 +37,8 @@ public: const PositionListFormat *GetPositionListFormat() const { return pos_list_format_; } + inline SizeT GetSizeInBytes() const { return pos_list_buffer_.GetSizeInBytes() + pos_skiplist_writer_->GetSizeInBytes(); } + private: void CreatePosSkipListWriter(); void AddPosSkipListItem(u32 total_pos_count, u32 compressed_pos_size, bool need_flush); @@ -45,10 +46,10 @@ private: private: PostingByteSlice pos_list_buffer_; - pos_t last_pos_in_cur_doc_; // 4byte - u32 total_pos_count_; // 4byte + pos_t last_pos_in_cur_doc_; // 4byte + u32 total_pos_count_; // 4byte PostingFormatOption format_option_; - bool is_own_format_; // 1byte + bool is_own_format_; // 1byte UniquePtr pos_skiplist_writer_; const PositionListFormat *pos_list_format_; }; diff --git a/src/storage/invertedindex/format/posting_buffer.cppm b/src/storage/invertedindex/format/posting_buffer.cppm index 38d23ac3e9..0dcc17fbaf 100644 --- a/src/storage/invertedindex/format/posting_buffer.cppm +++ b/src/storage/invertedindex/format/posting_buffer.cppm @@ -37,6 +37,8 @@ public: u8 Size() const { return size_; } + inline SizeT GetSizeInBytes() const { return capacity_ * posting_fields_->GetTotalSize(); } + u8 GetRowCount() const { return posting_fields_->GetSize(); } template diff --git a/src/storage/invertedindex/format/posting_byte_slice.cppm b/src/storage/invertedindex/format/posting_byte_slice.cppm index a0b42c7db4..b76bc696cd 100644 --- a/src/storage/invertedindex/format/posting_byte_slice.cppm +++ b/src/storage/invertedindex/format/posting_byte_slice.cppm @@ -57,6 +57,8 @@ public: SizeT EstimateDumpSize() const { return posting_writer_.GetSize(); } + inline SizeT GetSizeInBytes() const { return buffer_.GetSizeInBytes() + posting_writer_.GetSize(); } + protected: SizeT DoFlush(); @@ -71,4 +73,4 @@ inline void PostingByteSlice::PushBack(u8 row, T value) { buffer_.PushBack(row, value); } -} // namespace infinity \ No newline at end of file +} // namespace infinity diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index 184095bd62..ef5d9f7fd2 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -66,6 +66,7 @@ import utility; import persist_result_handler; import virtual_store; import local_file_handle; +import mem_usage_change; namespace infinity { constexpr int MAX_TUPLE_LENGTH = 1024; // we assume that analyzed term, together with docid/offset info, will never exceed such length @@ -76,10 +77,16 @@ bool MemoryIndexer::KeyComp::operator()(const String &lhs, const String &rhs) co MemoryIndexer::PostingTable::PostingTable() {} -MemoryIndexer::MemoryIndexer(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag, const String &analyzer) +MemoryIndexer::MemoryIndexer(const String &index_dir, + const String &base_name, + RowID base_row_id, + optionflag_t flag, + const String &analyzer, + SegmentIndexEntry *segment_index_entry) : index_dir_(index_dir), base_name_(base_name), base_row_id_(base_row_id), flag_(flag), posting_format_(PostingFormatOption(flag_)), analyzer_(analyzer), inverting_thread_pool_(infinity::InfinityContext::instance().GetFulltextInvertingThreadPool()), - commiting_thread_pool_(infinity::InfinityContext::instance().GetFulltextCommitingThreadPool()), ring_inverted_(15UL), ring_sorted_(13UL) { + commiting_thread_pool_(infinity::InfinityContext::instance().GetFulltextCommitingThreadPool()), ring_inverted_(15UL), ring_sorted_(13UL), + segment_index_entry_(segment_index_entry) { assert(std::filesystem::path(index_dir).is_absolute()); posting_table_ = MakeShared(); prepared_posting_ = MakeShared(posting_format_, column_lengths_); @@ -138,6 +145,8 @@ void MemoryIndexer::Insert(SharedPtr column_vector, u32 row_offset } inverting_thread_pool_.push(std::move(func)); } else { + // mem trace : the column_lengths_; + IncreaseMemoryUsage(sizeof(u32) * row_count); PostingWriterProvider provider = [this](const String &term) -> SharedPtr { return GetOrAddPosting(term); }; auto inverter = MakeShared(provider, column_lengths_); inverter->InitAnalyzer(this->analyzer_); @@ -221,6 +230,7 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) { return 0; } + MemUsageChange mem_usage_change = {true, 0}; while (1) { this->ring_sorted_.GetBatch(inverters, wait_if_empty_ms); // num_merged = inverters.size(); @@ -228,7 +238,7 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) { break; } for (auto &inverter : inverters) { - inverter->GeneratePosting(); + mem_usage_change.Add(inverter->GeneratePosting()); num_generated += inverter->GetMerged(); } } @@ -239,6 +249,11 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) { cv_.notify_all(); } } + if (mem_usage_change.is_add_) { + IncreaseMemoryUsage(mem_usage_change.mem_); + } else { + DecreaseMemoryUsage(mem_usage_change.mem_); + } // LOG_INFO(fmt::format("MemoryIndexer::CommitSync sorted {} inverters, generated posting for {} inverters(merged to {}), inflight_tasks_ is {}", // num_sorted, @@ -376,6 +391,8 @@ SharedPtr MemoryIndexer::GetOrAddPosting(const String &term) { PostingPtr posting; bool found = posting_store.GetOrAdd(term, posting, prepared_posting_); if (!found) { + // mem trace : add term's size + IncreaseMemoryUsage(term.size()); prepared_posting_ = MakeShared(posting_format_, column_lengths_); } return posting; @@ -386,6 +403,40 @@ void MemoryIndexer::Reset() { posting_table_->store_.Clear(); } column_lengths_.Clear(); + DecreaseMemoryUsage(mem_used_); +} + +MemIndexTracerInfo MemoryIndexer::GetInfo() const { + auto *table_index_entry = segment_index_entry_->table_index_entry(); + SharedPtr index_name = table_index_entry->GetIndexName(); + auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry(); + SharedPtr table_name = table_entry->GetTableName(); + SharedPtr db_name = table_entry->GetDBName(); + + return MemIndexTracerInfo(index_name, table_name, db_name, MemUsed(), doc_count_); +} + +TableIndexEntry *MemoryIndexer::table_index_entry() const { return segment_index_entry_->table_index_entry(); } + +SizeT MemoryIndexer::MemUsed() const { return mem_used_; } + +void MemoryIndexer::ApplyMemUseChange(MemUsageChange mem_change) { + if (mem_change.is_add_) { + IncreaseMemoryUsage(mem_change.mem_); + } else { + DecreaseMemoryUsage(mem_change.mem_); + } +} + +void MemoryIndexer::IncreaseMemoryUsage(SizeT mem) { + mem_used_ += mem; + BaseMemIndex::AddMemUsed(mem); +} + +void MemoryIndexer::DecreaseMemoryUsage(SizeT mem) { + assert(mem_used_ >= mem); + mem_used_ -= mem; + BaseMemIndex::DecreaseMemoryUsage(mem); } void MemoryIndexer::TupleListToIndexFile(UniquePtr> &merger) { @@ -546,4 +597,4 @@ void MemoryIndexer::PrepareSpillFile() { buf_writer_ = MakeUnique(spill_file_handle_, write_buf_size); } -} // namespace infinity \ No newline at end of file +} // namespace infinity diff --git a/src/storage/invertedindex/memory_indexer.cppm b/src/storage/invertedindex/memory_indexer.cppm index 79ffc8755b..58c9c3ab2e 100644 --- a/src/storage/invertedindex/memory_indexer.cppm +++ b/src/storage/invertedindex/memory_indexer.cppm @@ -34,11 +34,18 @@ import buf_writer; import posting_list_format; import external_sort_merger; import persistence_manager; +import base_memindex; +import memindex_tracer; +import segment_index_entry; +import table_index_entry; +import mem_usage_change; namespace infinity { -export class MemoryIndexer { +export class MemoryIndexer final : public BaseMemIndex { public: + void ApplyMemUseChange(MemUsageChange mem_change); + struct KeyComp { bool operator()(const String &lhs, const String &rhs) const; }; @@ -52,7 +59,12 @@ public: PostingTableStore store_; }; - MemoryIndexer(const String &index_dir, const String &base_name, RowID base_row_id, optionflag_t flag, const String &analyzer); + MemoryIndexer(const String &index_dir, + const String &base_name, + RowID base_row_id, + optionflag_t flag, + const String &analyzer, + SegmentIndexEntry *segment_index_entry); ~MemoryIndexer(); @@ -106,7 +118,19 @@ public: void Reset(); + MemIndexTracerInfo GetInfo() const override; + + TableIndexEntry *table_index_entry() const override; + + SizeT MemUsed() const; + private: + // call with write lock + void IncreaseMemoryUsage(SizeT mem); + + // call with write lock + void DecreaseMemoryUsage(SizeT mem); + // CommitOffline is for offline case. It spill a batch of ColumnInverter. Returns the size of the batch. SizeT CommitOffline(SizeT wait_if_empty_ms = 0); @@ -157,5 +181,8 @@ private: UniquePtr spill_buffer_{}; SizeT spill_buffer_size_{0}; UniquePtr buf_writer_; + + SegmentIndexEntry *segment_index_entry_{nullptr}; + SizeT mem_used_{0}; }; } // namespace infinity diff --git a/src/storage/invertedindex/posting_writer.cpp b/src/storage/invertedindex/posting_writer.cpp index cddcba5927..11577f2960 100644 --- a/src/storage/invertedindex/posting_writer.cpp +++ b/src/storage/invertedindex/posting_writer.cpp @@ -13,6 +13,7 @@ import posting_list_format; import index_defines; import term_meta; import vector_with_lock; +import mem_usage_change; module posting_writer; @@ -106,4 +107,16 @@ InMemPostingDecoder *PostingWriter::CreateInMemPostingDecoder() const { return posting_decoder; } -} // namespace infinity \ No newline at end of file + +MemUsageChange PostingWriter::GetSizeChange() { + SizeT size = doc_list_encoder_->GetSizeInBytes() + position_list_encoder_->GetSizeInBytes(); + SizeT last_size = last_size_; + last_size_ = size; + if (size >= last_size) { + return MemUsageChange{true, size - last_size}; + } else { + return MemUsageChange{false, last_size - size}; + } +} + +} // namespace infinity diff --git a/src/storage/invertedindex/posting_writer.cppm b/src/storage/invertedindex/posting_writer.cppm index 7d46ffbd01..6981099b08 100644 --- a/src/storage/invertedindex/posting_writer.cppm +++ b/src/storage/invertedindex/posting_writer.cppm @@ -13,6 +13,7 @@ import posting_list_format; import index_defines; import term_meta; import vector_with_lock; +import mem_usage_change; namespace infinity { export class PostingWriter { @@ -47,7 +48,11 @@ public: u32 GetDocColumnLength(docid_t doc_id) { return column_lengths_.Get(doc_id); } + MemUsageChange GetSizeChange(); + private: + // for memory tracing + SizeT last_size_{0}; const PostingFormat &posting_format_; DocListEncoder *doc_list_encoder_{nullptr}; PositionListEncoder *position_list_encoder_{nullptr}; @@ -57,4 +62,4 @@ private: export using PostingWriterProvider = std::function(const String &)>; -} // namespace infinity \ No newline at end of file +} // namespace infinity diff --git a/src/storage/knn_index/knn_ivf/ivf_index_data_in_mem.cpp b/src/storage/knn_index/knn_ivf/ivf_index_data_in_mem.cpp index 2d23faadce..40c8353af4 100644 --- a/src/storage/knn_index/knn_ivf/ivf_index_data_in_mem.cpp +++ b/src/storage/knn_index/knn_ivf/ivf_index_data_in_mem.cpp @@ -110,6 +110,8 @@ class IVFIndexInMemT final : public IVFIndexInMem { } } + ~IVFIndexInMemT() { BaseMemIndex::DecreaseMemoryUsage(MemoryUsed()); } + MemIndexTracerInfo GetInfo() const override { auto *table_index_entry = segment_index_entry_->table_index_entry(); SharedPtr index_name = table_index_entry->GetIndexName(); diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index 51ff72a5fd..6ff3424ea0 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -67,6 +67,7 @@ import hnsw_util; import wal_entry; import infinity_context; import defer_op; +import memory_indexer; namespace infinity { @@ -208,7 +209,8 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr block_entry, { std::unique_lock lck(rw_locker_); String full_path = Path(InfinityContext::instance().config()->DataDir()) / *table_index_entry_->index_dir(); - memory_indexer_ = MakeUnique(full_path, base_name, begin_row_id, index_fulltext->flag_, index_fulltext->analyzer_); + memory_indexer_ = + MakeUnique(full_path, base_name, begin_row_id, index_fulltext->flag_, index_fulltext->analyzer_, this); } table_index_entry_->UpdateFulltextSegmentTs(commit_ts); } else { @@ -459,7 +461,7 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn const IndexFullText *index_fulltext = static_cast(index_base); String base_name = fmt::format("ft_{:016x}", base_row_id.ToUint64()); String full_path = Path(InfinityContext::instance().config()->DataDir()) / *table_index_entry_->index_dir(); - memory_indexer_ = MakeUnique(full_path, base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_); + memory_indexer_ = MakeUnique(full_path, base_name, base_row_id, index_fulltext->flag_, index_fulltext->analyzer_, this); u64 column_id = column_def->id(); SizeT column_idx = table_entry->GetColumnIdxByID(column_id); auto block_entry_iter = BlockEntryIter(segment_entry); @@ -979,11 +981,12 @@ ChunkIndexEntry *SegmentIndexEntry::RebuildChunkIndexEntries(TxnTableStore *txn_ } BaseMemIndex *SegmentIndexEntry::GetMemIndex() const { - // only support hnsw and ivf index now. if (memory_hnsw_index_.get() != nullptr) { return static_cast(memory_hnsw_index_.get()); } else if (memory_ivf_index_.get() != nullptr) { return static_cast(memory_ivf_index_.get()); + } else if (memory_indexer_.get() != nullptr) { + return static_cast(memory_indexer_.get()); } return nullptr; } @@ -1218,4 +1221,14 @@ void SegmentIndexEntry::ResetOptimizing() { optimizing_.compare_exchange_strong(expected, false); } +Pair SegmentIndexEntry::GetFulltextColumnLenInfo() { + std::shared_lock lock(rw_locker_); + if (ft_column_len_sum_ == 0 && memory_indexer_.get() != nullptr) { + return {memory_indexer_->GetColumnLengthSum(), memory_indexer_->GetDocCount()}; + } + return {ft_column_len_sum_, ft_column_len_cnt_}; +} + +void SegmentIndexEntry::SetMemoryIndexer(UniquePtr &&memory_indexer) { memory_indexer_ = std::move(memory_indexer); } + } // namespace infinity diff --git a/src/storage/meta/entry/segment_index_entry.cppm b/src/storage/meta/entry/segment_index_entry.cppm index 9e3bc057e8..cd9170a992 100644 --- a/src/storage/meta/entry/segment_index_entry.cppm +++ b/src/storage/meta/entry/segment_index_entry.cppm @@ -30,7 +30,6 @@ import index_base; import column_def; import cleanup_scanner; import chunk_index_entry; -import memory_indexer; import default_values; import statement_common; import txn; @@ -50,6 +49,7 @@ class SecondaryIndexInMem; class EMVBIndexInMem; class BMPIndexInMem; class BaseMemIndex; +class MemoryIndexer; export struct PopulateEntireConfig { bool prepare_; @@ -180,13 +180,8 @@ public: return {chunk_index_entries_, memory_emvb_index_}; } - Pair GetFulltextColumnLenInfo() { - std::shared_lock lock(rw_locker_); - if (ft_column_len_sum_ == 0 && memory_indexer_.get() != nullptr) { - return {memory_indexer_->GetColumnLengthSum(), memory_indexer_->GetDocCount()}; - } - return {ft_column_len_sum_, ft_column_len_cnt_}; - } + Pair GetFulltextColumnLenInfo(); + void UpdateFulltextColumnLenInfo(u64 column_len_sum, u32 column_len_cnt) { std::unique_lock lock(rw_locker_); ft_column_len_sum_ += column_len_sum; @@ -230,7 +225,7 @@ public: // only for unittest MemoryIndexer *GetMemoryIndexer() { return memory_indexer_.get(); } - void SetMemoryIndexer(UniquePtr &&memory_indexer) { memory_indexer_ = std::move(memory_indexer); } + void SetMemoryIndexer(UniquePtr &&memory_indexer); static SharedPtr CreateFakeEntry(const String &index_dir); ChunkID GetNextChunkID() { return next_chunk_id_++; } diff --git a/src/storage/tracer/base_memindex.cppm b/src/storage/tracer/base_memindex.cppm index 1a77a66e1a..f2ecabdd2c 100644 --- a/src/storage/tracer/base_memindex.cppm +++ b/src/storage/tracer/base_memindex.cppm @@ -35,6 +35,11 @@ protected: auto *memindex_tracer = InfinityContext::instance().storage()->memindex_tracer(); memindex_tracer->AddMemUsed(mem); } + + void DecreaseMemoryUsage(SizeT mem) { + auto *memindex_tracer = InfinityContext::instance().storage()->memindex_tracer(); + memindex_tracer->DecreaseMemUsed(mem); + } }; } // namespace infinity diff --git a/src/storage/tracer/mem_usage_change.cppm b/src/storage/tracer/mem_usage_change.cppm new file mode 100644 index 0000000000..9f844b2a92 --- /dev/null +++ b/src/storage/tracer/mem_usage_change.cppm @@ -0,0 +1,44 @@ +module; + +#include + +export module mem_usage_change; + +import stl; + +namespace infinity { + +export struct MemUsageChange { + bool is_add_{true}; + SizeT mem_{0}; + + void Add(const MemUsageChange &other) { + if (this->mem_ == 0) { + this->mem_ = other.mem_; + this->is_add_ = other.is_add_; + return; + } + + if (this->is_add_ == other.is_add_) { + this->mem_ += other.mem_; + } else { + if (other.mem_ > this->mem_) { + this->mem_ = other.mem_ - this->mem_; + this->is_add_ = !this->is_add_; + } else { + this->mem_ -= other.mem_; + } + } + } + + SizeT Apply(SizeT original) { + if (is_add_) { + return original + mem_; + } else { + assert(mem_ <= original); + return original - mem_; + } + } +}; + +} // namespace infinity diff --git a/src/storage/tracer/memindex_tracer.cppm b/src/storage/tracer/memindex_tracer.cppm index c91a8b851a..401799314b 100644 --- a/src/storage/tracer/memindex_tracer.cppm +++ b/src/storage/tracer/memindex_tracer.cppm @@ -85,7 +85,7 @@ protected: }; inline void MemIndexTracer::AddMemUsed(SizeT add) { - LOG_TRACE(fmt::format("Add mem used: {}, mem index limit: {}", add, index_memory_limit_)); + // LOG_TRACE(fmt::format("Add mem used: {}, mem index limit: {}", add, index_memory_limit_)); if (add == 0 || index_memory_limit_ == 0) { return; } diff --git a/src/unit_test/storage/invertedindex/column_index_merger.cpp b/src/unit_test/storage/invertedindex/column_index_merger.cpp index d08fd55ca6..93cc1122d1 100644 --- a/src/unit_test/storage/invertedindex/column_index_merger.cpp +++ b/src/unit_test/storage/invertedindex/column_index_merger.cpp @@ -94,7 +94,7 @@ void ColumnIndexMergerTest::CreateIndex(const Vector& paragraphs, column->AppendValue(v); } for (SizeT i = 0; i < chunk_names.size(); ++i) { - MemoryIndexer indexer(index_dir, chunk_names[i], base_row_ids[i], flag_, "standard"); + MemoryIndexer indexer(index_dir, chunk_names[i], base_row_ids[i], flag_, "standard", nullptr); indexer.Insert(column, row_offsets[i], row_counts[i]); indexer.Dump(); } diff --git a/src/unit_test/storage/invertedindex/memory_indexer.cpp b/src/unit_test/storage/invertedindex/memory_indexer.cpp index eb90698129..c5c069ff60 100644 --- a/src/unit_test/storage/invertedindex/memory_indexer.cpp +++ b/src/unit_test/storage/invertedindex/memory_indexer.cpp @@ -127,12 +127,12 @@ INSTANTIATE_TEST_SUITE_P(TestWithDifferentParams, TEST_P(MemoryIndexerTest, Insert) { // prepare fake segment index entry auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1.Insert(column_, 0, 1); indexer1.Insert(column_, 1, 3); indexer1.Dump(); - auto indexer2 = MakeUnique(GetFullDataDir(), "chunk2", RowID(0U, 4U), flag_, "standard"); + auto indexer2 = MakeUnique(GetFullDataDir(), "chunk2", RowID(0U, 4U), flag_, "standard", fake_segment_index_entry_1.get()); indexer2->Insert(column_, 4, 1); while (indexer2->GetInflightTasks() > 0) { sleep(1); @@ -149,7 +149,7 @@ TEST_P(MemoryIndexerTest, Insert) { TEST_P(MemoryIndexerTest, test2) { auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1.Insert(column_, 0, 2, true); indexer1.Insert(column_, 2, 2, true); indexer1.Insert(column_, 4, 1, true); @@ -165,7 +165,7 @@ TEST_P(MemoryIndexerTest, test2) { TEST_P(MemoryIndexerTest, test3) { auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1.Insert(empty_column_, 0, 10, true); indexer1.Dump(true); fake_segment_index_entry_1->AddFtChunkIndexEntry("chunk1", RowID(0U, 0U).ToUint64(), 5U); @@ -182,7 +182,7 @@ TEST_P(MemoryIndexerTest, test3) { TEST_P(MemoryIndexerTest, test4) { auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1.Insert(empty_column_, 0, 5, true); indexer1.Insert(column_, 0, 5, true); indexer1.Dump(true); @@ -201,7 +201,7 @@ TEST_P(MemoryIndexerTest, test4) { TEST_P(MemoryIndexerTest, SpillLoadTest) { auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - auto indexer1 = MakeUnique(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + auto indexer1 = MakeUnique(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1->Insert(column_, 0, 2); indexer1->Insert(column_, 2, 2); indexer1->Insert(column_, 4, 1); @@ -211,7 +211,8 @@ TEST_P(MemoryIndexerTest, SpillLoadTest) { } indexer1->Dump(false, true); - UniquePtr loaded_indexer = MakeUnique(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + UniquePtr loaded_indexer = + MakeUnique(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); loaded_indexer->Load(); SegmentID segment_id = fake_segment_index_entry_1->segment_id(); @@ -251,7 +252,7 @@ TEST_P(MemoryIndexerTest, SeekPosition) { } auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1.Insert(column, 0, 8192); while (indexer1.GetInflightTasks() > 0) { sleep(1); diff --git a/src/unit_test/storage/invertedindex/posting_merger.cpp b/src/unit_test/storage/invertedindex/posting_merger.cpp index 48a6922ae1..c9c1748fc4 100644 --- a/src/unit_test/storage/invertedindex/posting_merger.cpp +++ b/src/unit_test/storage/invertedindex/posting_merger.cpp @@ -75,12 +75,12 @@ void PostingMergerTest::CreateIndex() { } auto fake_segment_index_entry_1 = SegmentIndexEntry::CreateFakeEntry(GetFullDataDir()); - MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard"); + MemoryIndexer indexer1(GetFullDataDir(), "chunk1", RowID(0U, 0U), flag_, "standard", fake_segment_index_entry_1.get()); indexer1.Insert(column, 0, 1); indexer1.Dump(); fake_segment_index_entry_1->AddFtChunkIndexEntry("chunk1", RowID(0U, 0U).ToUint64(), 1U); - auto indexer2 = MakeUnique(GetFullDataDir(), "chunk2", RowID(0U, 1U), flag_, "standard"); + auto indexer2 = MakeUnique(GetFullDataDir(), "chunk2", RowID(0U, 1U), flag_, "standard", fake_segment_index_entry_1.get()); indexer2->Insert(column, 1, 1); indexer2->Dump(); } @@ -185,4 +185,4 @@ TEST_P(PostingMergerTest, Basic) { for (auto segment_term_posting : segment_term_postings) { delete segment_term_posting; } -} \ No newline at end of file +}