Skip to content

Commit

Permalink
Update SecondaryIndex (#2391)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Support memory_quota for SecondaryIndex

Issue link:#1563

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Refactoring
  • Loading branch information
yangzq50 authored Dec 19, 2024
1 parent 5a651bf commit 57e6000
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 26 deletions.
6 changes: 4 additions & 2 deletions src/storage/meta/entry/segment_index_entry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ void SegmentIndexEntry::MemIndexInsert(SharedPtr<BlockEntry> block_entry,
case IndexType::kSecondary: {
if (memory_secondary_index_.get() == nullptr) {
std::unique_lock<std::shared_mutex> lck(rw_locker_);
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, begin_row_id);
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, this, begin_row_id);
}
BlockColumnEntry *block_column_entry = block_entry->GetColumnBlockEntry(column_idx);
memory_secondary_index_->InsertBlockData(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
Expand Down Expand Up @@ -506,7 +506,7 @@ void SegmentIndexEntry::PopulateEntirely(const SegmentEntry *segment_entry, Txn
break;
}
case IndexType::kSecondary: {
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, base_row_id);
memory_secondary_index_ = SecondaryIndexInMem::NewSecondaryIndexInMem(column_def, this, base_row_id);
u64 column_id = column_def->id();
SizeT column_idx = table_entry->GetColumnIdxByID(column_id);
auto block_entry_iter = BlockEntryIter(segment_entry);
Expand Down Expand Up @@ -989,6 +989,8 @@ BaseMemIndex *SegmentIndexEntry::GetMemIndex() const {
return static_cast<BaseMemIndex *>(memory_ivf_index_.get());
} else if (memory_indexer_.get() != nullptr) {
return static_cast<BaseMemIndex *>(memory_indexer_.get());
} else if (memory_secondary_index_.get() != nullptr) {
return static_cast<BaseMemIndex *>(memory_secondary_index_.get());
} else if (memory_bmp_index_.get() != nullptr) {
return static_cast<BaseMemIndex *>(memory_bmp_index_.get());
}
Expand Down
79 changes: 59 additions & 20 deletions src/storage/secondary_index/secondary_index_in_mem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
module;

#include <bit>
#include <cassert>
#include <vector>

module secondary_index_in_mem;
Expand All @@ -31,33 +32,54 @@ import infinity_exception;
import secondary_index_data;
import chunk_index_entry;
import segment_index_entry;
import table_index_entry;
import buffer_handle;
import logger;
import base_memindex;
import memindex_tracer;

namespace infinity {

constexpr u32 map_memory_bloat_factor = 3;

template <typename RawValueType>
class SecondaryIndexInMemT final : public SecondaryIndexInMem {
using KeyType = ConvertToOrderedType<RawValueType>;
const RowID begin_row_id_;
const u32 max_size_;
mutable std::shared_mutex map_mutex_;
MultiMap<KeyType, u32> in_mem_secondary_index_;

protected:
u32 GetRowCountNoLock() const override { return in_mem_secondary_index_.size(); }
u32 MemoryCostOfEachRow() const override { return map_memory_bloat_factor * (sizeof(KeyType) + sizeof(u32)); }
u32 MemoryCostOfThis() const override { return sizeof(*this); }

public:
explicit SecondaryIndexInMemT(const RowID begin_row_id, const u32 max_size) : begin_row_id_(begin_row_id), max_size_(max_size) {}
u32 GetRowCount() const override { return in_mem_secondary_index_.size(); }
SecondaryIndexInMemT(SegmentIndexEntry *segment_index_entry, const RowID begin_row_id)
: SecondaryIndexInMem(segment_index_entry), begin_row_id_(begin_row_id) {
IncreaseMemoryUsageBase(MemoryCostOfThis());
}
~SecondaryIndexInMemT() override {
DecreaseMemoryUsageBase(MemoryCostOfThis() + GetRowCount() * MemoryCostOfEachRow());
}
u32 GetRowCount() const override {
std::shared_lock lock(map_mutex_);
return in_mem_secondary_index_.size();
}
void InsertBlockData(const SegmentOffset block_offset,
BlockColumnEntry *block_column_entry,
BufferManager *buffer_manager,
const u32 row_offset,
const u32 row_count) override {
MemIndexInserterIter<RawValueType> iter(block_offset, block_column_entry, buffer_manager, row_offset, row_count);
InsertInner(iter);
const auto inserted_rows = InsertInner(iter);
assert(inserted_rows == row_count);
IncreaseMemoryUsageBase(inserted_rows * MemoryCostOfEachRow());
}
SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const override {
assert(segment_index_entry == segment_index_entry_);
std::shared_lock lock(map_mutex_);
u32 row_count = GetRowCount();
const u32 row_count = GetRowCountNoLock();
auto new_chunk_index_entry = segment_index_entry->CreateSecondaryIndexChunkIndexEntry(begin_row_id_, row_count, buffer_mgr);
BufferHandle handle = new_chunk_index_entry->GetIndex();
auto data_ptr = static_cast<SecondaryIndexData *>(handle.GetDataMut());
Expand All @@ -70,7 +92,8 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem {
}

private:
void InsertInner(auto &iter) {
u32 InsertInner(auto &iter) {
u32 inserted_count = 0;
std::unique_lock lock(map_mutex_);
while (true) {
auto opt = iter.Next();
Expand All @@ -87,7 +110,9 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem {
const KeyType key = ConvertToOrderedKeyValue(*v_ptr);
in_mem_secondary_index_.emplace(key, offset);
}
++inserted_count;
}
return inserted_count;
}

Pair<u32, Bitmask> RangeQueryInner(const u32 segment_row_count, const KeyType b, const KeyType e) const {
Expand All @@ -107,44 +132,58 @@ class SecondaryIndexInMemT final : public SecondaryIndexInMem {
}
};

SharedPtr<SecondaryIndexInMem> SecondaryIndexInMem::NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def, RowID begin_row_id, u32 max_size) {
MemIndexTracerInfo SecondaryIndexInMem::GetInfo() const {
auto *table_index_entry = segment_index_entry_->table_index_entry();
SharedPtr<String> index_name = table_index_entry->GetIndexName();
auto *table_entry = table_index_entry->table_index_meta()->GetTableEntry();
SharedPtr<String> table_name = table_entry->GetTableName();
SharedPtr<String> db_name = table_entry->GetDBName();
const auto row_cnt = GetRowCount();
const auto mem = MemoryCostOfThis() + row_cnt * MemoryCostOfEachRow();
return MemIndexTracerInfo(std::move(index_name), std::move(table_name), std::move(db_name), mem, row_cnt);
}

TableIndexEntry *SecondaryIndexInMem::table_index_entry() const { return segment_index_entry_->table_index_entry(); }

SharedPtr<SecondaryIndexInMem> SecondaryIndexInMem::NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def,
SegmentIndexEntry *segment_index_entry,
RowID begin_row_id) {
if (!column_def->type()->CanBuildSecondaryIndex()) {
String error_message = "Column type can't build secondary index";
UnrecoverableError(error_message);
UnrecoverableError("Column type can't build secondary index");
}
switch (column_def->type()->type()) {
case LogicalType::kTinyInt: {
return MakeShared<SecondaryIndexInMemT<TinyIntT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<TinyIntT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kSmallInt: {
return MakeShared<SecondaryIndexInMemT<SmallIntT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<SmallIntT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kInteger: {
return MakeShared<SecondaryIndexInMemT<IntegerT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<IntegerT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kBigInt: {
return MakeShared<SecondaryIndexInMemT<BigIntT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<BigIntT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kFloat: {
return MakeShared<SecondaryIndexInMemT<FloatT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<FloatT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kDouble: {
return MakeShared<SecondaryIndexInMemT<DoubleT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<DoubleT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kDate: {
return MakeShared<SecondaryIndexInMemT<DateT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<DateT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kTime: {
return MakeShared<SecondaryIndexInMemT<TimeT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<TimeT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kDateTime: {
return MakeShared<SecondaryIndexInMemT<DateTimeT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<DateTimeT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kTimestamp: {
return MakeShared<SecondaryIndexInMemT<TimestampT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<TimestampT> >(segment_index_entry, begin_row_id);
}
case LogicalType::kVarchar: {
return MakeShared<SecondaryIndexInMemT<VarcharT>>(begin_row_id, max_size);
return MakeShared<SecondaryIndexInMemT<VarcharT> >(segment_index_entry, begin_row_id);
}
default: {
return nullptr;
Expand Down
32 changes: 28 additions & 4 deletions src/storage/secondary_index/secondary_index_in_mem.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -18,29 +18,53 @@ export module secondary_index_in_mem;

import stl;
import roaring_bitmap;
import internal_types;
import column_def;
import table_index_entry;
import base_memindex;
import memindex_tracer;

namespace infinity {

struct RowID;
struct BlockColumnEntry;
class BufferManager;
class ColumnDef;
struct ChunkIndexEntry;
struct SegmentIndexEntry;

export class SecondaryIndexInMem {
export class SecondaryIndexInMem : public BaseMemIndex {
protected:
SegmentIndexEntry *segment_index_entry_ = nullptr;

explicit SecondaryIndexInMem(SegmentIndexEntry *segment_index_entry) : segment_index_entry_(segment_index_entry) {}

virtual u32 GetRowCountNoLock() const = 0;

virtual u32 MemoryCostOfEachRow() const = 0;

virtual u32 MemoryCostOfThis() const = 0;

public:
virtual ~SecondaryIndexInMem() = default;

MemIndexTracerInfo GetInfo() const override;

TableIndexEntry *table_index_entry() const override;

virtual u32 GetRowCount() const = 0;

virtual void InsertBlockData(SegmentOffset block_offset,
BlockColumnEntry *block_column_entry,
BufferManager *buffer_manager,
u32 row_offset,
u32 row_count) = 0;

virtual SharedPtr<ChunkIndexEntry> Dump(SegmentIndexEntry *segment_index_entry, BufferManager *buffer_mgr) const = 0;

virtual Pair<u32, Bitmask> RangeQuery(const void *input) const = 0;

static SharedPtr<SecondaryIndexInMem> NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def, RowID begin_row_id, u32 max_size = 5 << 20);
static SharedPtr<SecondaryIndexInMem> NewSecondaryIndexInMem(const SharedPtr<ColumnDef> &column_def,
SegmentIndexEntry *segment_index_entry,
RowID begin_row_id);
};

} // namespace infinity

0 comments on commit 57e6000

Please sign in to comment.