From 57e60009952e8f794e3fd1334832bb3cb18cf640 Mon Sep 17 00:00:00 2001 From: yangzq50 <58433399+yangzq50@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:23:49 +0800 Subject: [PATCH] Update SecondaryIndex (#2391) ### What problem does this PR solve? Support memory_quota for SecondaryIndex Issue link:#1563 ### Type of change - [x] New Feature (non-breaking change which adds functionality) - [x] Refactoring --- .../meta/entry/segment_index_entry.cpp | 6 +- .../secondary_index_in_mem.cpp | 79 ++++++++++++++----- .../secondary_index_in_mem.cppm | 32 +++++++- 3 files changed, 91 insertions(+), 26 deletions(-) diff --git a/src/storage/meta/entry/segment_index_entry.cpp b/src/storage/meta/entry/segment_index_entry.cpp index cbc75adc5b..1e5f2b48e9 100644 --- a/src/storage/meta/entry/segment_index_entry.cpp +++ b/src/storage/meta/entry/segment_index_entry.cpp @@ -243,7 +243,7 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr block_entry, case IndexType::kSecondary: { if (memory_secondary_index_.get() == nullptr) { std::unique_lock lck(rw_locker_); - memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, begin_row_id); + memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, this, begin_row_id); } BlockColumnEntry *block_column_entry = block_entry->GetColumnBlockEntry(column_idx); memory_secondary_index_->InsertBlockData(block_offset, block_column_entry, buffer_manager, row_offset, row_count); @@ -506,7 +506,7 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn break; } case IndexType::kSecondary: { - memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, base_row_id); + memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, this, base_row_id); u64 column_id = column_def->id(); SizeT column_idx = table_entry->GetColumnIdxByID(column_id); auto block_entry_iter = BlockEntryIter(segment_entry); @@ -989,6 +989,8 @@ BaseMemIndex *SegmentIndexEntry::GetMemIndex() const { return static_cast(memory_ivf_index_.get()); } else if (memory_indexer_.get() != nullptr) { return static_cast(memory_indexer_.get()); + } else if (memory_secondary_index_.get() != nullptr) { + return static_cast(memory_secondary_index_.get()); } else if (memory_bmp_index_.get() != nullptr) { return static_cast(memory_bmp_index_.get()); } diff --git a/src/storage/secondary_index/secondary_index_in_mem.cpp b/src/storage/secondary_index/secondary_index_in_mem.cpp index e66018a4ca..f863229040 100644 --- a/src/storage/secondary_index/secondary_index_in_mem.cpp +++ b/src/storage/secondary_index/secondary_index_in_mem.cpp @@ -15,6 +15,7 @@ module; #include +#include #include module secondary_index_in_mem; @@ -31,33 +32,54 @@ import infinity_exception; import secondary_index_data; import chunk_index_entry; import segment_index_entry; +import table_index_entry; import buffer_handle; import logger; +import base_memindex; +import memindex_tracer; namespace infinity { +constexpr u32 map_memory_bloat_factor = 3; + template class SecondaryIndexInMemT final : public SecondaryIndexInMem { using KeyType = ConvertToOrderedType; const RowID begin_row_id_; - const u32 max_size_; mutable std::shared_mutex map_mutex_; MultiMap in_mem_secondary_index_; +protected: + u32 GetRowCountNoLock() const override { return in_mem_secondary_index_.size(); } + u32 MemoryCostOfEachRow() const override { return map_memory_bloat_factor * (sizeof(KeyType) + sizeof(u32)); } + u32 MemoryCostOfThis() const override { return sizeof(*this); } + public: - explicit SecondaryIndexInMemT(const RowID begin_row_id, const u32 max_size) : begin_row_id_(begin_row_id), max_size_(max_size) {} - u32 GetRowCount() const override { return in_mem_secondary_index_.size(); } + SecondaryIndexInMemT(SegmentIndexEntry *segment_index_entry, const RowID begin_row_id) + : SecondaryIndexInMem(segment_index_entry), begin_row_id_(begin_row_id) { + IncreaseMemoryUsageBase(MemoryCostOfThis()); + } + ~SecondaryIndexInMemT() override { + DecreaseMemoryUsageBase(MemoryCostOfThis() + GetRowCount() * MemoryCostOfEachRow()); + } + u32 GetRowCount() const override { + std::shared_lock lock(map_mutex_); + return in_mem_secondary_index_.size(); + } void InsertBlockData(const SegmentOffset block_offset, BlockColumnEntry *block_column_entry, BufferManager *buffer_manager, const u32 row_offset, const u32 row_count) override { MemIndexInserterIter iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count); - InsertInner(iter); + const auto inserted_rows = InsertInner(iter); + assert(inserted_rows == row_count); + IncreaseMemoryUsageBase(inserted_rows * MemoryCostOfEachRow()); } SharedPtr Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const override { + assert(segment_index_entry == segment_index_entry_); std::shared_lock lock(map_mutex_); - u32 row_count = GetRowCount(); + const u32 row_count = GetRowCountNoLock(); auto new_chunk_index_entry = segment_index_entry->CreateSecondaryIndexChunkIndexEntry(begin_row_id_, row_count, buffer_mgr); BufferHandle handle = new_chunk_index_entry->GetIndex(); auto data_ptr = static_cast(handle.GetDataMut()); @@ -70,7 +92,8 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem { } private: - void InsertInner(auto &iter) { + u32 InsertInner(auto &iter) { + u32 inserted_count = 0; std::unique_lock lock(map_mutex_); while (true) { auto opt = iter.Next(); @@ -87,7 +110,9 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem { const KeyType key = ConvertToOrderedKeyValue(*v_ptr); in_mem_secondary_index_.emplace(key, offset); } + ++inserted_count; } + return inserted_count; } Pair RangeQueryInner(const u32 segment_row_count, const KeyType b, const KeyType e) const { @@ -107,44 +132,58 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem { } }; -SharedPtr SecondaryIndexInMem::NewSecondaryIndexInMem(const SharedPtr &column_def, RowID begin_row_id, u32 max_size) { +MemIndexTracerInfo SecondaryIndexInMem::GetInfo() const { + auto *table_index_entry = segment_index_entry_->table_index_entry(); + SharedPtr index_name = table_index_entry->GetIndexName(); + auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry(); + SharedPtr table_name = table_entry->GetTableName(); + SharedPtr db_name = table_entry->GetDBName(); + const auto row_cnt = GetRowCount(); + const auto mem = MemoryCostOfThis() + row_cnt * MemoryCostOfEachRow(); + return MemIndexTracerInfo(std::move(index_name), std::move(table_name), std::move(db_name), mem, row_cnt); +} + +TableIndexEntry *SecondaryIndexInMem::table_index_entry() const { return segment_index_entry_->table_index_entry(); } + +SharedPtr SecondaryIndexInMem::NewSecondaryIndexInMem(const SharedPtr &column_def, + SegmentIndexEntry *segment_index_entry, + RowID begin_row_id) { if (!column_def->type()->CanBuildSecondaryIndex()) { - String error_message = "Column type can't build secondary index"; - UnrecoverableError(error_message); + UnrecoverableError("Column type can't build secondary index"); } switch (column_def->type()->type()) { case LogicalType::kTinyInt: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kSmallInt: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kInteger: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kBigInt: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kFloat: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kDouble: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kDate: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kTime: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kDateTime: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kTimestamp: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } case LogicalType::kVarchar: { - return MakeShared>(begin_row_id, max_size); + return MakeShared >(segment_index_entry, begin_row_id); } default: { return nullptr; diff --git a/src/storage/secondary_index/secondary_index_in_mem.cppm b/src/storage/secondary_index/secondary_index_in_mem.cppm index fb7f0fcc7a..2f01699259 100644 --- a/src/storage/secondary_index/secondary_index_in_mem.cppm +++ b/src/storage/secondary_index/secondary_index_in_mem.cppm @@ -18,29 +18,53 @@ export module secondary_index_in_mem; import stl; import roaring_bitmap; +import internal_types; +import column_def; +import table_index_entry; +import base_memindex; +import memindex_tracer; namespace infinity { -struct RowID; struct BlockColumnEntry; class BufferManager; -class ColumnDef; struct ChunkIndexEntry; struct SegmentIndexEntry; -export class SecondaryIndexInMem { +export class SecondaryIndexInMem : public BaseMemIndex { +protected: + SegmentIndexEntry *segment_index_entry_ = nullptr; + + explicit SecondaryIndexInMem(SegmentIndexEntry *segment_index_entry) : segment_index_entry_(segment_index_entry) {} + + virtual u32 GetRowCountNoLock() const = 0; + + virtual u32 MemoryCostOfEachRow() const = 0; + + virtual u32 MemoryCostOfThis() const = 0; + public: virtual ~SecondaryIndexInMem() = default; + + MemIndexTracerInfo GetInfo() const override; + + TableIndexEntry *table_index_entry() const override; + virtual u32 GetRowCount() const = 0; + virtual void InsertBlockData(SegmentOffset block_offset, BlockColumnEntry *block_column_entry, BufferManager *buffer_manager, u32 row_offset, u32 row_count) = 0; + virtual SharedPtr Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const = 0; + virtual Pair RangeQuery(const void *input) const = 0; - static SharedPtr NewSecondaryIndexInMem(const SharedPtr &column_def, RowID begin_row_id, u32 max_size = 5 << 20); + static SharedPtr NewSecondaryIndexInMem(const SharedPtr &column_def, + SegmentIndexEntry *segment_index_entry, + RowID begin_row_id); }; } // namespace infinity \ No newline at end of file