Skip to content

Commit

Permalink
Add result cache2 (#2062)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Add lru in result cache manager.
Add config of result.
Add slt test case for cached fulltext and paged fulltext search. 

### Type of change

- [x] New Feature (non-breaking change which adds functionality)
- [x] Test cases
  • Loading branch information
small-turtle-1 authored Oct 22, 2024
1 parent c61641c commit 6582c3f
Show file tree
Hide file tree
Showing 97 changed files with 2,164 additions and 267 deletions.
1 change: 1 addition & 0 deletions conf/pytest_parallel_infinity_conf.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ persistence_dir = "/var/infinity/persistence"
[buffer]
buffer_manager_size = "8GB"
temp_dir = "/var/infinity/tmp"
result_cache_mode = "on"

[wal]
wal_dir = "/var/infinity/wal"
Expand Down
1 change: 1 addition & 0 deletions conf/pytest_parallel_infinity_vfs_off.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ data_dir = "/var/infinity/data"
[buffer]
buffer_manager_size = "8GB"
temp_dir = "/var/infinity/tmp"
result_cache_mode = "on"

[wal]
wal_dir = "/var/infinity/wal"
Expand Down
5 changes: 5 additions & 0 deletions src/common/default_values.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,9 @@ export {

constexpr SizeT INSERT_BATCH_ROW_LIMIT = 8192;

constexpr std::string_view DEFAULT_RESULT_CACHE_MODE = "off";
constexpr SizeT DEFAULT_CACHE_RESULT_NUM = 10000;

// default persistence parameter
constexpr std::string_view DEFAULT_PERSISTENCE_DIR = "/var/infinity/persistence"; // Empty means disabled
constexpr std::string_view DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT_STR = "128MB"; // 128MB
Expand Down Expand Up @@ -257,6 +260,8 @@ export {
constexpr std::string_view LRU_NUM_OPTION_NAME = "lru_num";
constexpr std::string_view TEMP_DIR_OPTION_NAME = "temp_dir";
constexpr std::string_view MEMINDEX_MEMORY_QUOTA_OPTION_NAME = "memindex_memory_quota";
constexpr std::string_view RESULT_CACHE_MODE_OPTION_NAME = "result_cache_mode";
constexpr std::string_view CACHE_RESULT_NUM_OPTION_NAME = "cache_result_num";

constexpr std::string_view WAL_DIR_OPTION_NAME = "wal_dir";
constexpr std::string_view WAL_COMPACT_THRESHOLD_OPTION_NAME = "wal_compact_threshold";
Expand Down
54 changes: 52 additions & 2 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -820,7 +820,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalKnnScan *knn_scan_node, SharedPt
result->emplace_back(MakeShared<String>(table_name));

// Table index
String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(knn_scan_node->knn_table_index_);
String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(knn_scan_node->table_index());
result->emplace_back(MakeShared<String>(table_index));

KnnExpression *knn_expr_raw = knn_scan_node->knn_expression_.get();
Expand Down Expand Up @@ -2368,7 +2368,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalMergeKnn *merge_knn_node,
result->emplace_back(MakeShared<String>(explain_header_str));

// Table index
String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(merge_knn_node->knn_table_index());
String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(merge_knn_node->table_index());
result->emplace_back(MakeShared<String>(table_index));

// Output columns
Expand Down Expand Up @@ -2600,6 +2600,56 @@ void ExplainPhysicalPlan::Explain(const PhysicalMergeMatchTensor *merge_match_te
}
}

void ExplainPhysicalPlan::Explain(const PhysicalMergeMatchSparse *merge_match_sparse_node,
SharedPtr<Vector<SharedPtr<String>>> &result,
i64 intent_size) {
String explain_header_str;
if (intent_size != 0) {
explain_header_str = String(intent_size - 2, ' ') + "-> MERGE MatchSparse ";
} else {
explain_header_str = "MERGE MatchSparse ";
}
explain_header_str += "(" + std::to_string(merge_match_sparse_node->node_id()) + ")";
result->emplace_back(MakeShared<String>(explain_header_str));

// Table alias and name
String table_name = String(intent_size, ' ') + " - table name: " + merge_match_sparse_node->TableAlias() + "(";

table_name += *merge_match_sparse_node->table_collection_ptr()->GetDBName() + ".";
table_name += *merge_match_sparse_node->table_collection_ptr()->GetTableName() + ")";
result->emplace_back(MakeShared<String>(table_name));

// Table index
String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(merge_match_sparse_node->table_index());
result->emplace_back(MakeShared<String>(table_index));

String match_sparse_expression =
String(intent_size, ' ') + " - MatchSparse expression: " + merge_match_sparse_node->match_sparse_expr()->ToString();
result->emplace_back(MakeShared<String>(std::move(match_sparse_expression)));

String top_n_expression = String(intent_size, ' ') + " - Top N: " + std::to_string(merge_match_sparse_node->GetTopN());
result->emplace_back(MakeShared<String>(std::move(top_n_expression)));

// Output columns
String output_columns = String(intent_size, ' ') + " - output columns: [";
SizeT column_count = merge_match_sparse_node->GetOutputNames()->size();
if (column_count == 0) {
String error_message = "No column in PhysicalMergeMatchSparse node.";
UnrecoverableError(error_message);
}
for (SizeT idx = 0; idx < column_count - 1; ++idx) {
output_columns += merge_match_sparse_node->GetOutputNames()->at(idx) + ", ";
}
output_columns += merge_match_sparse_node->GetOutputNames()->back();
output_columns += "]";
result->emplace_back(MakeShared<String>(output_columns));

if (merge_match_sparse_node->left() == nullptr) {
String error_message = "PhysicalMergeMatchSparse should have child node!";
UnrecoverableError(error_message);
}
}

void ExplainPhysicalPlan::Explain(const PhysicalFusion *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
String explain_header_str;
if (intent_size != 0) {
Expand Down
3 changes: 3 additions & 0 deletions src/executor/explain_physical_plan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ import physical_merge_top;
import physical_merge_sort;
import physical_merge_knn;
import physical_merge_match_tensor;
import physical_merge_match_sparse;
import physical_match;
import physical_match_tensor_scan;
import physical_fusion;
Expand Down Expand Up @@ -176,6 +177,8 @@ public:

static void Explain(const PhysicalMergeMatchTensor *merge_match_tensor_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalMergeMatchSparse *merge_match_sparse_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalFusion *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);

static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size = 0);
Expand Down
17 changes: 17 additions & 0 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
Config* config = query_context->global_config();
GlobalOptionIndex config_index = config->GetOptionIndex(set_command->var_name());
switch(config_index) {
case GlobalOptionIndex::kResultCacheMode: {
if (set_command->value_type() != SetVarType::kString) {
Status status = Status::DataTypeMismatch("String", set_command->value_type_str());
RecoverableError(status);
}
if (set_command->value_str() == "on") {
config->SetCacheResult("on");
return true;
}
if (set_command->value_str() == "off") {
config->SetCacheResult("off");
return true;
}
Status status = Status::SetInvalidVarValue("cache result", "on, off");
RecoverableError(status);
break;
}
case GlobalOptionIndex::kLogLevel: {
if (set_command->value_type() != SetVarType::kString) {
Status status = Status::DataTypeMismatch("String", set_command->value_type_str());
Expand Down
9 changes: 8 additions & 1 deletion src/executor/operator/physical_fusion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ import mlas_matrix_multiply;
import physical_match_tensor_scan;
import physical_knn_scan;
import physical_merge_knn;
import physical_read_cache;

namespace infinity {

Expand Down Expand Up @@ -230,7 +231,8 @@ void PhysicalFusion::ExecuteRRFWeighted(const Map<u64, Vector<UniquePtr<DataBloc
child_op = right();
else
child_op = other_children_[i - 2].get();
switch (child_op->operator_type()) {
auto child_type = child_op->operator_type();
switch (child_type) {
case PhysicalOperatorType::kKnnScan: {
PhysicalKnnScan *phy_knn_scan = static_cast<PhysicalKnnScan *>(child_op);
min_heaps[i] = phy_knn_scan->IsKnnMinHeap();
Expand All @@ -249,6 +251,11 @@ void PhysicalFusion::ExecuteRRFWeighted(const Map<u64, Vector<UniquePtr<DataBloc
min_heaps[i] = true;
break;
}
case PhysicalOperatorType::kReadCache: {
PhysicalReadCache *phy_read_cache = static_cast<PhysicalReadCache *>(child_op);
min_heaps[i] = phy_read_cache->is_min_heap();
break;
}
default: {
String error_message = fmt::format("Cannot determine heap type of operator {}", int(child_op->operator_type()));
UnrecoverableError(error_message);
Expand Down
36 changes: 36 additions & 0 deletions src/executor/operator/physical_read_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,42 @@ import column_vector;

namespace infinity {

PhysicalReadCache::PhysicalReadCache(u64 id,
LogicalNodeType origin_type,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CacheContent> cache_content,
Vector<SizeT> column_map,
SharedPtr<Vector<LoadMeta>> load_metas,
bool is_min_heap)
: PhysicalOperator(PhysicalOperatorType::kReadCache, nullptr, nullptr, id, load_metas), base_table_ref_(base_table_ref),
cache_content_(cache_content), column_map_(column_map), is_min_heap_(is_min_heap) {
switch (origin_type) {
case LogicalNodeType::kMatch: {
origin_type_ = PhysicalOperatorType::kMatch;
break;
}
case LogicalNodeType::kKnnScan: {
origin_type_ = PhysicalOperatorType::kKnnScan;
break;
}
case LogicalNodeType::kMatchSparseScan: {
origin_type_ = PhysicalOperatorType::kMatchSparseScan;
break;
}
case LogicalNodeType::kMatchTensorScan: {
origin_type_ = PhysicalOperatorType::kMatchTensorScan;
break;
}
case LogicalNodeType::kIndexScan: {
origin_type_ = PhysicalOperatorType::kIndexScan;
break;
}
default: {
UnrecoverableError("Not implemented");
}
}
}

bool PhysicalReadCache::Execute(QueryContext *query_context, OperatorState *operator_state) {
auto *read_cache_state = static_cast<ReadCacheState *>(operator_state);
Vector<UniquePtr<DataBlock>> &data_block_array = read_cache_state->data_block_array_;
Expand Down
18 changes: 15 additions & 3 deletions src/executor/operator/physical_read_cache.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,19 @@ import data_type;
import result_cache_manager;
import physical_operator_type;
import load_meta;
import logical_node_type;

namespace infinity {

export class PhysicalReadCache : public PhysicalOperator {
public:
PhysicalReadCache(u64 id,
LogicalNodeType origin_type,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CacheContent> cache_content,
Vector<SizeT> column_map,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kReadCache, nullptr, nullptr, id, load_metas), base_table_ref_(base_table_ref),
cache_content_(cache_content), column_map_(column_map) {}
SharedPtr<Vector<LoadMeta>> load_metas,
bool is_min_heap);

void Init() override {};

Expand All @@ -45,14 +46,25 @@ public:

SharedPtr<Vector<SharedPtr<DataType>>> GetOutputTypes() const override;

void FillingTableRefs(HashMap<SizeT, SharedPtr<BaseTableRef>> &table_refs) override {
table_refs.insert({base_table_ref_->table_index_, base_table_ref_});
}


const BaseTableRef *base_table_ref() const { return base_table_ref_.get(); }

PhysicalOperatorType origin_type() const { return origin_type_; }

bool is_min_heap() const { return is_min_heap_; }

private:
SharedPtr<BaseTableRef> base_table_ref_;

SharedPtr<CacheContent> cache_content_;

PhysicalOperatorType origin_type_;
Vector<SizeT> column_map_; // result column id -> cache column id
bool is_min_heap_;
};

} // namespace infinity
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,12 @@ public:
PhysicalOperatorType type,
UniquePtr<PhysicalOperator> left,
UniquePtr<PhysicalOperator> right,
u64 table_index,
SharedPtr<BaseTableRef> base_table_ref,
SharedPtr<CommonQueryFilter> common_query_filter,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalScanBase(id, type, std::move(left), std::move(right), base_table_ref, load_metas), common_query_filter_(common_query_filter) {}
: PhysicalScanBase(id, type, std::move(left), std::move(right), table_index, base_table_ref, load_metas),
common_query_filter_(common_query_filter) {}

bool CalculateFilterBitmask(SegmentID segment_id, BlockID block_id, BlockOffset row_count, Bitmask &bitmask) const;

Expand Down
11 changes: 9 additions & 2 deletions src/executor/operator/physical_scan/physical_index_scan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import fast_rough_filter;
import roaring_bitmap;
import filter_value_type_classification;
import physical_scan_base;
import result_cache_manager;

namespace infinity {

Expand All @@ -49,8 +50,9 @@ PhysicalIndexScan::PhysicalIndexScan(const u64 id,
SharedPtr<Vector<LoadMeta>> load_metas,
SharedPtr<Vector<String>> output_names,
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
const bool add_row_id)
: PhysicalScanBase(id, PhysicalOperatorType::kIndexScan, nullptr, nullptr, std::move(base_table_ref), std::move(load_metas)),
const bool add_row_id,
bool cache_result)
: PhysicalScanBase(id, PhysicalOperatorType::kIndexScan, nullptr, nullptr, 0, std::move(base_table_ref), std::move(load_metas), cache_result),
index_filter_(std::move(index_filter)), index_filter_evaluator_(std::move(index_filter_evaluator)),
fast_rough_filter_evaluator_(std::move(fast_rough_filter_evaluator)), output_names_(std::move(output_names)),
output_types_(std::move(output_types)), add_row_id_(add_row_id) {
Expand Down Expand Up @@ -182,6 +184,11 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp
// Finished
index_scan_operator_state->SetComplete();
}

ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager();
if (cache_result_ && cache_mgr != nullptr) {
AddCache(query_context, cache_mgr, output_data_blocks);
}
};
// check FastRoughFilter
const auto &fast_rough_filter = *segment_entry->GetFastRoughFilter();
Expand Down
8 changes: 2 additions & 6 deletions src/executor/operator/physical_scan/physical_index_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public:
SharedPtr<Vector<LoadMeta>> load_metas,
SharedPtr<Vector<String>> output_names,
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
bool add_row_id = true);
bool add_row_id = true,
bool cache_result = false);

~PhysicalIndexScan() final = default;

Expand All @@ -78,11 +79,6 @@ public:

Vector<SharedPtr<Vector<GlobalBlockID>>> PlanBlockEntries(i64) const override;

// for InputLoad
void FillingTableRefs(HashMap<SizeT, SharedPtr<BaseTableRef>> &table_refs) override {
table_refs.insert({base_table_ref_->table_index_, base_table_ref_});
}

Vector<UniquePtr<Vector<SegmentID>>> PlanSegments(u32 parallel_count) const;

inline String table_alias() const { return base_table_ref_->alias_; }
Expand Down
17 changes: 9 additions & 8 deletions src/executor/operator/physical_scan/physical_knn_scan.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,15 @@ public:
SharedPtr<Vector<SharedPtr<DataType>>> output_types,
u64 knn_table_index,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalFilterScanBase(id, PhysicalOperatorType::kKnnScan, nullptr, nullptr, base_table_ref, common_query_filter, load_metas),
knn_expression_(std::move(knn_expression)), output_names_(std::move(output_names)), output_types_(std::move(output_types)),
knn_table_index_(knn_table_index) {}
: PhysicalFilterScanBase(id,
PhysicalOperatorType::kKnnScan,
nullptr,
nullptr,
knn_table_index,
base_table_ref,
common_query_filter,
load_metas),
knn_expression_(std::move(knn_expression)), output_names_(std::move(output_names)), output_types_(std::move(output_types)) {}

~PhysicalKnnScan() override = default;

Expand All @@ -72,10 +78,6 @@ public:

SizeT TaskletCount() override;

void FillingTableRefs(HashMap<SizeT, SharedPtr<BaseTableRef>> &table_refs) override {
table_refs.insert({base_table_ref_->table_index_, base_table_ref_});
}

inline bool IsKnnMinHeap() const { return knn_expression_->IsKnnMinHeap(); }

private:
Expand All @@ -91,7 +93,6 @@ public:

SharedPtr<Vector<String>> output_names_{};
SharedPtr<Vector<SharedPtr<DataType>>> output_types_{};
u64 knn_table_index_{};

Vector<Pair<u32, u32>> block_parallel_options_;
u32 block_column_entries_size_ = 0; // need this value because block_column_entries_ will be moved into KnnScanSharedData
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,15 @@ PhysicalMatchSparseScan::PhysicalMatchSparseScan(u64 id,
SharedPtr<MatchSparseExpression> match_sparse_expression,
const SharedPtr<CommonQueryFilter> &common_query_filter,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalFilterScanBase(id, PhysicalOperatorType::kMatchSparseScan, nullptr, nullptr, base_table_ref, common_query_filter, load_metas),
table_index_(table_index), match_sparse_expr_(std::move(match_sparse_expression)) {}
: PhysicalFilterScanBase(id,
PhysicalOperatorType::kMatchSparseScan,
nullptr,
nullptr,
table_index,
base_table_ref,
common_query_filter,
load_metas),
match_sparse_expr_(std::move(match_sparse_expression)) {}

void PhysicalMatchSparseScan::Init() { search_column_id_ = match_sparse_expr_->column_expr_->binding().column_idx; }

Expand Down
Loading

0 comments on commit 6582c3f

Please sign in to comment.