Skip to content

Commit

Permalink
Fix parallel readwrite (#1191)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

1. Fix: visible check of entry created by committing txn.
2. Fix: dead lock in `SegmentIndexEntry::MemIndexInsert`

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
  • Loading branch information
small-turtle-1 authored May 9, 2024
1 parent af24a07 commit 01e0435
Show file tree
Hide file tree
Showing 43 changed files with 244 additions and 185 deletions.
19 changes: 9 additions & 10 deletions src/executor/operator/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,14 +452,13 @@ struct FilterResult {
}

template <typename ColumnValueType>
inline void
ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, const TxnTimeStamp ts) {
inline void ExecuteSingleRangeT(const FilterIntervalRangeT<ColumnValueType> &interval_range, SegmentIndexEntry &index_entry, Txn *txn) {
Vector<UniquePtr<TrunkReader<ColumnValueType>>> trunk_readers;
Tuple<Vector<SharedPtr<ChunkIndexEntry>>, SharedPtr<SecondaryIndexInMem>> chunks_snapshot = index_entry.GetSecondaryIndexSnapshot();
const u32 segment_row_count = SegmentRowCount();
auto &[chunk_index_entries, memory_secondary_index] = chunks_snapshot;
for (const auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
trunk_readers.emplace_back(MakeUnique<TrunkReaderT<ColumnValueType>>(segment_row_count, chunk_index_entry));
}
}
Expand Down Expand Up @@ -492,7 +491,7 @@ struct FilterResult {
inline void ExecuteSingleRange(const HashMap<ColumnID, TableIndexEntry *> &column_index_map,
const FilterExecuteSingleRange &single_range,
SegmentID segment_id,
const TxnTimeStamp ts) {
Txn *txn) {
// step 1. check if range is empty
if (single_range.IsEmpty()) {
return SetEmptyResult();
Expand All @@ -504,7 +503,7 @@ struct FilterResult {
// step 3. search index
auto &interval_range_variant = single_range.GetIntervalRange();
std::visit(Overload{[&]<typename ColumnValueType>(const FilterIntervalRangeT<ColumnValueType> &interval_range) {
ExecuteSingleRangeT(interval_range, index_entry, ts);
ExecuteSingleRangeT(interval_range, index_entry, txn);
},
[](const std::monostate &empty) {
UnrecoverableError("FilterResult::ExecuteSingleRange(): class member interval_range_ not initialized!");
Expand Down Expand Up @@ -598,7 +597,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
Vector<FilterResult> result_stack;
// execute filter_execute_command_ (Reverse Polish notation)
for (auto const &elem : filter_execute_command) {
Expand Down Expand Up @@ -628,7 +627,7 @@ FilterResult SolveSecondaryIndexFilterInner(const Vector<FilterExecuteElem> &fil
},
[&](const FilterExecuteSingleRange &single_range) {
result_stack.emplace_back(segment_row_count, segment_row_actual_count);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, ts);
result_stack.back().ExecuteSingleRange(column_index_map, single_range, segment_id, txn);
}},
elem);
}
Expand All @@ -644,13 +643,13 @@ std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector<Filter
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts) {
Txn *txn) {
if (filter_execute_command.empty()) {
// return all true
return std::variant<Vector<u32>, Bitmask>(std::in_place_type<Bitmask>);
}
auto result =
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, ts);
SolveSecondaryIndexFilterInner(filter_execute_command, column_index_map, segment_id, segment_row_count, segment_row_actual_count, txn);
return std::move(result.selected_rows_);
}

Expand Down Expand Up @@ -709,7 +708,7 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
DeleteFilter delete_filter(segment_entry, begin_ts, segment_entry->row_count(begin_ts));
// output
const auto result =
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, begin_ts);
SolveSecondaryIndexFilterInner(filter_execute_command_, column_index_map_, segment_id, segment_row_count, segment_row_actual_count, txn);
result.Output(output_data_blocks, segment_id, delete_filter);

LOG_TRACE(fmt::format("IndexScan: job number: {}, segment_ids.size(): {}, finished", next_idx, segment_ids.size()));
Expand Down
4 changes: 3 additions & 1 deletion src/executor/operator/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import bitmask;

namespace infinity {

class Txn;

// for int range filter, x > n is equivalent to x >= n + 1
// for float range filter, x > f is equivalent to x >= std::nextafter(f, INFINITY)
// we can use this to simplify the filter
Expand Down Expand Up @@ -110,6 +112,6 @@ export std::variant<Vector<u32>, Bitmask> SolveSecondaryIndexFilter(const Vector
const SegmentID segment_id,
const u32 segment_row_count,
const u32 segment_row_actual_count,
const TxnTimeStamp ts);
Txn *txn);

} // namespace infinity
9 changes: 5 additions & 4 deletions src/executor/operator/physical_knn_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,10 @@ SizeT PhysicalKnnScan::BlockEntryCount() const { return base_table_ref_->block_i

template <typename DataType, template <typename, typename> typename C>
void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperatorState *operator_state) {
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
Txn *txn = query_context->GetTxn();
TxnTimeStamp begin_ts = txn->BeginTS();

if (!common_query_filter_->TryFinishBuild(begin_ts, query_context->GetTxn()->buffer_mgr())) {
if (!common_query_filter_->TryFinishBuild(txn)) {
// not ready, abort and wait for next time
return;
}
Expand Down Expand Up @@ -484,7 +485,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
if (block_entry == nullptr) {
UnrecoverableError(
fmt::format("Cannot find segment id: {}, block id: {}, index chunk is {}", segment_id, block_id, chunk_id));
}
} // this is for debug
}
merge_heap->Search(0, d_ptr.get(), row_ids.get(), result_n);
}
Expand All @@ -493,7 +494,7 @@ void PhysicalKnnScan::ExecuteInternal(QueryContext *query_context, KnnScanOperat
auto [chunk_index_entries, memory_index_entry] = segment_index_entry->GetHnswIndexSnapshot();
int i = 0;
for (auto &chunk_index_entry : chunk_index_entries) {
if (chunk_index_entry->CheckVisible(begin_ts)) {
if (chunk_index_entry->CheckVisible(txn)) {
BufferHandle index_handle = chunk_index_entry->GetIndex();
hnsw_search(index_handle, false, i++);
}
Expand Down
10 changes: 5 additions & 5 deletions src/executor/operator/physical_match.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module;
module physical_match;

import stl;

import txn;
import query_context;
import operator_state;
import physical_operator;
Expand Down Expand Up @@ -517,9 +517,8 @@ bool PhysicalMatch::ExecuteInnerHomebrewed(QueryContext *query_context, Operator
auto execute_start_time = std::chrono::high_resolution_clock::now();
// 1. build QueryNode tree
// 1.1 populate column2analyzer
TransactionID txn_id = query_context->GetTxn()->TxnID();
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
QueryBuilder query_builder(txn_id, begin_ts, base_table_ref_);
Txn *txn = query_context->GetTxn();
QueryBuilder query_builder(txn, base_table_ref_);
auto finish_init_query_builder_time = std::chrono::high_resolution_clock::now();
TimeDurationType query_builder_init_duration = finish_init_query_builder_time - execute_start_time;
LOG_TRACE(fmt::format("PhysicalMatch Part 0.1: Init QueryBuilder time: {} ms", query_builder_init_duration.count()));
Expand Down Expand Up @@ -805,7 +804,8 @@ bool PhysicalMatch::Execute(QueryContext *query_context, OperatorState *operator
auto start_time = std::chrono::high_resolution_clock::now();
assert(common_query_filter_);
{
bool try_result = common_query_filter_->TryFinishBuild(query_context->GetTxn()->BeginTS(), query_context->GetTxn()->buffer_mgr());
Txn *txn = query_context->GetTxn();
bool try_result = common_query_filter_->TryFinishBuild(txn);
auto finish_filter_time = std::chrono::high_resolution_clock::now();
std::chrono::duration<float, std::milli> filter_duration = finish_filter_time - start_time;
LOG_TRACE(fmt::format("PhysicalMatch Prepare: Filter time: {} ms", filter_duration.count()));
Expand Down
6 changes: 3 additions & 3 deletions src/planner/bound/base_table_ref.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ export module base_table_ref;
import stl;
import table_ref;
import table_entry;

import txn;
import table_function;
import block_index;
import internal_types;
Expand All @@ -48,8 +48,8 @@ public:
explicit BaseTableRef(TableEntry *table_entry, SharedPtr<BlockIndex> block_index)
: TableRef(TableRefType::kTable, ""), table_entry_ptr_(table_entry), block_index_(std::move(block_index)) {}

static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, TxnTimeStamp ts) {
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(ts);
static SharedPtr<BaseTableRef> FakeTableRef(TableEntry *table_entry, Txn *txn) {
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);
return MakeShared<BaseTableRef>(table_entry, std::move(block_index));
}

Expand Down
6 changes: 3 additions & 3 deletions src/planner/query_binder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ import data_type;
import logical_type;
import base_entry;
import view_entry;
import txn;

namespace infinity {

Expand Down Expand Up @@ -425,10 +426,9 @@ SharedPtr<BaseTableRef> QueryBinder::BuildBaseTable(QueryContext *query_context,
columns.emplace_back(idx);
}

// TransactionID txn_id = query_context->GetTxn()->TxnID();
TxnTimeStamp begin_ts = query_context->GetTxn()->BeginTS();
Txn *txn = query_context->GetTxn();

SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(begin_ts);
SharedPtr<BlockIndex> block_index = table_entry->GetBlockIndex(txn);

u64 table_index = bind_context_ptr_->GenerateTableIndex();
auto table_ref = MakeShared<BaseTableRef>(table_entry, std::move(columns), block_index, alias, table_index, names_ptr, types_ptr);
Expand Down
2 changes: 1 addition & 1 deletion src/storage/bg_task/compact_segments_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ void CompactSegmentsTask::CompactSegments(CompactSegmentsTaskState &state) {
}

auto new_segment = CompactSegmentsToOne(state, to_compact_segments);
block_index->Insert(new_segment.get(), UNCOMMIT_TS, false);
block_index->Insert(new_segment.get(), txn_);

segment_data.emplace_back(new_segment, std::move(to_compact_segments));
old_segments.insert(old_segments.end(), to_compact_segments.begin(), to_compact_segments.end());
Expand Down
24 changes: 14 additions & 10 deletions src/storage/common/block_index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,36 @@

module;

#include <vector>

module block_index;

import stl;
import segment_entry;
import global_block_id;
import block_iter;
import segment_iter;
import txn;

namespace infinity {

void BlockIndex::Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts) {
if (!check_ts || segment_entry->CheckVisible(timestamp)) {
void BlockIndex::Insert(SegmentEntry *segment_entry, Txn *txn) {
if (segment_entry->CheckVisible(txn)) {
u32 segment_id = segment_entry->segment_id();
segments_.emplace_back(segment_entry);
segment_index_.emplace(segment_id, segment_entry);
BlocksInfo blocks_info;

auto block_entry_iter = BlockEntryIter(segment_entry);
for (auto *block_entry = block_entry_iter.Next(); block_entry != nullptr; block_entry = block_entry_iter.Next()) {
if (timestamp >= block_entry->min_row_ts()) {
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry);
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
{
auto block_guard = segment_entry->GetBlocksGuard();
for (const auto &block_entry : block_guard.block_entries_) {
if (block_entry->CheckVisible(txn)) {
blocks_info.block_map_.emplace(block_entry->block_id(), block_entry.get());
global_blocks_.emplace_back(GlobalBlockID{segment_id, block_entry->block_id()});
}
}
}
blocks_info.segment_offset_ = segment_entry->row_count(timestamp);
// blocks_info.segment_offset_ = segment_entry->row_count(); // use false row count to pass benchmark
TxnTimeStamp begin_ts = txn->BeginTS();
blocks_info.segment_offset_ = segment_entry->row_count(begin_ts);

segment_block_index_.emplace(segment_id, std::move(blocks_info));
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/common/block_index.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace infinity {

struct BlockEntry;
struct SegmentEntry;
class Txn;

export struct BlockIndex {
private:
Expand All @@ -32,7 +33,7 @@ private:
};

public:
void Insert(SegmentEntry *segment_entry, TxnTimeStamp timestamp, bool check_ts = true);
void Insert(SegmentEntry *segment_entry, Txn *txn);

void Reserve(SizeT n);

Expand Down
6 changes: 4 additions & 2 deletions src/storage/invertedindex/column_index_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,9 @@ void TableIndexReaderCache::UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mut
last_known_update_ts_ = std::max(last_known_update_ts_, ts);
}

IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *self_table_entry_ptr) {
IndexReader TableIndexReaderCache::GetIndexReader(Txn *txn, TableEntry *self_table_entry_ptr) {
TxnTimeStamp begin_ts = txn->BeginTS();
TransactionID txn_id = txn->TxnID();
IndexReader result;
result.session_pool_ = MakeShared<MemoryPool>();
std::scoped_lock lock(mutex_);
Expand Down Expand Up @@ -176,7 +178,7 @@ IndexReader TableIndexReaderCache::GetIndexReader(TransactionID txn_id, TxnTimeS
optionflag_t flag = index_full_text->flag_;
String index_dir = *(table_index_entry->index_dir());
Map<SegmentID, SharedPtr<SegmentIndexEntry>> index_by_segment =
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, begin_ts);
table_index_entry->GetIndexBySegmentSnapshot(self_table_entry_ptr, txn);
column_index_reader->Open(flag, std::move(index_dir), std::move(index_by_segment));
(*result.column_index_readers_)[column_id] = std::move(column_index_reader);
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/invertedindex/column_index_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ export module column_index_reader;
namespace infinity {
struct TableEntry;
class BlockMaxTermDocIterator;
class Txn;

export class ColumnIndexReader {
public:
Expand Down Expand Up @@ -75,7 +76,7 @@ export class TableIndexReaderCache {
public:
void UpdateKnownUpdateTs(TxnTimeStamp ts, std::shared_mutex &segment_update_ts_mutex, TxnTimeStamp &segment_update_ts);

IndexReader GetIndexReader(TransactionID txn_id, TxnTimeStamp begin_ts, TableEntry *table_entry_ptr);
IndexReader GetIndexReader(Txn *txn, TableEntry *table_entry_ptr);

private:
std::mutex mutex_;
Expand Down
5 changes: 2 additions & 3 deletions src/storage/invertedindex/search/query_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@ import third_party;

namespace infinity {

QueryBuilder::QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref)
: txn_id_(txn_id), begin_ts_(begin_ts), table_entry_(base_table_ref->table_entry_ptr_),
index_reader_(table_entry_->GetFullTextIndexReader(txn_id_, begin_ts_)) {
QueryBuilder::QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref)
: table_entry_(base_table_ref->table_entry_ptr_), index_reader_(table_entry_->GetFullTextIndexReader(txn)) {
u64 total_row_count = 0;
for (SegmentEntry *segment_entry : base_table_ref->block_index_->segments_) {
total_row_count += segment_entry->row_count();
Expand Down
5 changes: 2 additions & 3 deletions src/storage/invertedindex/search/query_builder.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import base_table_ref;

namespace infinity {

class Txn;
struct QueryNode;
export struct FullTextQueryContext {
UniquePtr<QueryNode> query_tree_;
Expand All @@ -37,7 +38,7 @@ class EarlyTerminateIterator;

export class QueryBuilder {
public:
QueryBuilder(TransactionID txn_id, TxnTimeStamp begin_ts, SharedPtr<BaseTableRef> &base_table_ref);
QueryBuilder(Txn *txn, SharedPtr<BaseTableRef> &base_table_ref);

~QueryBuilder();

Expand All @@ -50,8 +51,6 @@ public:
inline float Score(RowID doc_id) { return scorer_.Score(doc_id); }

private:
TransactionID txn_id_{};
TxnTimeStamp begin_ts_{};
TableEntry *table_entry_{nullptr};
IndexReader index_reader_;
Scorer scorer_;
Expand Down
16 changes: 10 additions & 6 deletions src/storage/meta/catalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,17 @@ Tuple<SharedPtr<TableEntry>, Status> Catalog::DropTableByName(const String &db_n
return db_entry->DropTable(table_name, conflict_type, txn_id, begin_ts, txn_mgr);
}

Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, TransactionID txn_id, TxnTimeStamp begin_ts) {
Status Catalog::GetTables(const String &db_name, Vector<TableDetail> &output_table_array, Txn *txn) {
TransactionID txn_id = txn->TxnID();
TxnTimeStamp begin_ts = txn->BeginTS();
// Check the db entries
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
if (!status.ok()) {
// Error
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
return status;
}
return db_entry->GetTablesDetail(txn_id, begin_ts, output_table_array);
return db_entry->GetTablesDetail(txn, output_table_array);
}

Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
Expand All @@ -229,16 +231,17 @@ Tuple<TableEntry *, Status> Catalog::GetTableByName(const String &db_name, const
return db_entry->GetTableCollection(table_name, txn_id, begin_ts);
}

Tuple<SharedPtr<TableInfo>, Status>
Catalog::GetTableInfo(const String &db_name, const String &table_name, TransactionID txn_id, TxnTimeStamp begin_ts) {
Tuple<SharedPtr<TableInfo>, Status> Catalog::GetTableInfo(const String &db_name, const String &table_name, Txn *txn) {
TransactionID txn_id = txn->TxnID();
TxnTimeStamp begin_ts = txn->BeginTS();
auto [db_entry, status] = this->GetDatabase(db_name, txn_id, begin_ts);
if (!status.ok()) {
// Error
LOG_ERROR(fmt::format("Database: {} is invalid.", db_name));
return {nullptr, status};
}

return db_entry->GetTableInfo(table_name, txn_id, begin_ts);
return db_entry->GetTableInfo(table_name, txn);
}

Status Catalog::RemoveTableEntry(TableEntry *table_entry, TransactionID txn_id) {
Expand Down Expand Up @@ -657,7 +660,8 @@ void Catalog::LoadFromEntryDelta(TxnTimeStamp max_commit_ts, BufferManager *buff
commit_ts,
check_point_ts,
check_point_row_count,
buffer_mgr);
buffer_mgr,
txn_id);

if (merge_flag == MergeFlag::kNew) {
if (!block_filter_binary_data.empty()) {
Expand Down
Loading

0 comments on commit 01e0435

Please sign in to comment.