diff --git a/conf/pytest_parallel_infinity_conf.toml b/conf/pytest_parallel_infinity_conf.toml index f5efd734fe..50e5cc0038 100644 --- a/conf/pytest_parallel_infinity_conf.toml +++ b/conf/pytest_parallel_infinity_conf.toml @@ -17,6 +17,7 @@ persistence_dir = "/var/infinity/persistence" [buffer] buffer_manager_size = "8GB" temp_dir = "/var/infinity/tmp" +result_cache_mode = "on" [wal] wal_dir = "/var/infinity/wal" diff --git a/conf/pytest_parallel_infinity_vfs_off.toml b/conf/pytest_parallel_infinity_vfs_off.toml index 08af3aa21b..2c1dc889af 100644 --- a/conf/pytest_parallel_infinity_vfs_off.toml +++ b/conf/pytest_parallel_infinity_vfs_off.toml @@ -17,6 +17,7 @@ data_dir = "/var/infinity/data" [buffer] buffer_manager_size = "8GB" temp_dir = "/var/infinity/tmp" +result_cache_mode = "on" [wal] wal_dir = "/var/infinity/wal" diff --git a/src/common/default_values.cppm b/src/common/default_values.cppm index 4816dd9e43..73f8ba172f 100644 --- a/src/common/default_values.cppm +++ b/src/common/default_values.cppm @@ -198,6 +198,9 @@ export { constexpr SizeT INSERT_BATCH_ROW_LIMIT = 8192; + constexpr std::string_view DEFAULT_RESULT_CACHE_MODE = "off"; + constexpr SizeT DEFAULT_CACHE_RESULT_NUM = 10000; + // default persistence parameter constexpr std::string_view DEFAULT_PERSISTENCE_DIR = "/var/infinity/persistence"; // Empty means disabled constexpr std::string_view DEFAULT_PERSISTENCE_OBJECT_SIZE_LIMIT_STR = "128MB"; // 128MB @@ -257,6 +260,8 @@ export { constexpr std::string_view LRU_NUM_OPTION_NAME = "lru_num"; constexpr std::string_view TEMP_DIR_OPTION_NAME = "temp_dir"; constexpr std::string_view MEMINDEX_MEMORY_QUOTA_OPTION_NAME = "memindex_memory_quota"; + constexpr std::string_view RESULT_CACHE_MODE_OPTION_NAME = "result_cache_mode"; + constexpr std::string_view CACHE_RESULT_NUM_OPTION_NAME = "cache_result_num"; constexpr std::string_view WAL_DIR_OPTION_NAME = "wal_dir"; constexpr std::string_view WAL_COMPACT_THRESHOLD_OPTION_NAME = "wal_compact_threshold"; diff --git a/src/executor/explain_physical_plan.cpp b/src/executor/explain_physical_plan.cpp index 1ab3bd24dc..8296e18d1e 100644 --- a/src/executor/explain_physical_plan.cpp +++ b/src/executor/explain_physical_plan.cpp @@ -820,7 +820,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalKnnScan *knn_scan_node, SharedPt result->emplace_back(MakeShared(table_name)); // Table index - String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(knn_scan_node->knn_table_index_); + String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(knn_scan_node->table_index()); result->emplace_back(MakeShared(table_index)); KnnExpression *knn_expr_raw = knn_scan_node->knn_expression_.get(); @@ -2368,7 +2368,7 @@ void ExplainPhysicalPlan::Explain(const PhysicalMergeKnn *merge_knn_node, result->emplace_back(MakeShared(explain_header_str)); // Table index - String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(merge_knn_node->knn_table_index()); + String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(merge_knn_node->table_index()); result->emplace_back(MakeShared(table_index)); // Output columns @@ -2600,6 +2600,56 @@ void ExplainPhysicalPlan::Explain(const PhysicalMergeMatchTensor *merge_match_te } } +void ExplainPhysicalPlan::Explain(const PhysicalMergeMatchSparse *merge_match_sparse_node, + SharedPtr>> &result, + i64 intent_size) { + String explain_header_str; + if (intent_size != 0) { + explain_header_str = String(intent_size - 2, ' ') + "-> MERGE MatchSparse "; + } else { + explain_header_str = "MERGE MatchSparse "; + } + explain_header_str += "(" + std::to_string(merge_match_sparse_node->node_id()) + ")"; + result->emplace_back(MakeShared(explain_header_str)); + + // Table alias and name + String table_name = String(intent_size, ' ') + " - table name: " + merge_match_sparse_node->TableAlias() + "("; + + table_name += *merge_match_sparse_node->table_collection_ptr()->GetDBName() + "."; + table_name += *merge_match_sparse_node->table_collection_ptr()->GetTableName() + ")"; + result->emplace_back(MakeShared(table_name)); + + // Table index + String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(merge_match_sparse_node->table_index()); + result->emplace_back(MakeShared(table_index)); + + String match_sparse_expression = + String(intent_size, ' ') + " - MatchSparse expression: " + merge_match_sparse_node->match_sparse_expr()->ToString(); + result->emplace_back(MakeShared(std::move(match_sparse_expression))); + + String top_n_expression = String(intent_size, ' ') + " - Top N: " + std::to_string(merge_match_sparse_node->GetTopN()); + result->emplace_back(MakeShared(std::move(top_n_expression))); + + // Output columns + String output_columns = String(intent_size, ' ') + " - output columns: ["; + SizeT column_count = merge_match_sparse_node->GetOutputNames()->size(); + if (column_count == 0) { + String error_message = "No column in PhysicalMergeMatchSparse node."; + UnrecoverableError(error_message); + } + for (SizeT idx = 0; idx < column_count - 1; ++idx) { + output_columns += merge_match_sparse_node->GetOutputNames()->at(idx) + ", "; + } + output_columns += merge_match_sparse_node->GetOutputNames()->back(); + output_columns += "]"; + result->emplace_back(MakeShared(output_columns)); + + if (merge_match_sparse_node->left() == nullptr) { + String error_message = "PhysicalMergeMatchSparse should have child node!"; + UnrecoverableError(error_message); + } +} + void ExplainPhysicalPlan::Explain(const PhysicalFusion *fusion_node, SharedPtr>> &result, i64 intent_size) { String explain_header_str; if (intent_size != 0) { diff --git a/src/executor/explain_physical_plan.cppm b/src/executor/explain_physical_plan.cppm index 9da3b82aaf..41f1483f7f 100644 --- a/src/executor/explain_physical_plan.cppm +++ b/src/executor/explain_physical_plan.cppm @@ -62,6 +62,7 @@ import physical_merge_top; import physical_merge_sort; import physical_merge_knn; import physical_merge_match_tensor; +import physical_merge_match_sparse; import physical_match; import physical_match_tensor_scan; import physical_fusion; @@ -176,6 +177,8 @@ public: static void Explain(const PhysicalMergeMatchTensor *merge_match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const PhysicalMergeMatchSparse *merge_match_sparse_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const PhysicalFusion *fusion_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr>> &result, i64 intent_size = 0); diff --git a/src/executor/operator/physical_command.cpp b/src/executor/operator/physical_command.cpp index 7df5da871d..776139ad14 100644 --- a/src/executor/operator/physical_command.cpp +++ b/src/executor/operator/physical_command.cpp @@ -143,6 +143,23 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat Config* config = query_context->global_config(); GlobalOptionIndex config_index = config->GetOptionIndex(set_command->var_name()); switch(config_index) { + case GlobalOptionIndex::kResultCacheMode: { + if (set_command->value_type() != SetVarType::kString) { + Status status = Status::DataTypeMismatch("String", set_command->value_type_str()); + RecoverableError(status); + } + if (set_command->value_str() == "on") { + config->SetCacheResult("on"); + return true; + } + if (set_command->value_str() == "off") { + config->SetCacheResult("off"); + return true; + } + Status status = Status::SetInvalidVarValue("cache result", "on, off"); + RecoverableError(status); + break; + } case GlobalOptionIndex::kLogLevel: { if (set_command->value_type() != SetVarType::kString) { Status status = Status::DataTypeMismatch("String", set_command->value_type_str()); diff --git a/src/executor/operator/physical_fusion.cpp b/src/executor/operator/physical_fusion.cpp index 1260ff4194..4a4ceed336 100644 --- a/src/executor/operator/physical_fusion.cpp +++ b/src/executor/operator/physical_fusion.cpp @@ -55,6 +55,7 @@ import mlas_matrix_multiply; import physical_match_tensor_scan; import physical_knn_scan; import physical_merge_knn; +import physical_read_cache; namespace infinity { @@ -230,7 +231,8 @@ void PhysicalFusion::ExecuteRRFWeighted(const Mapoperator_type()) { + auto child_type = child_op->operator_type(); + switch (child_type) { case PhysicalOperatorType::kKnnScan: { PhysicalKnnScan *phy_knn_scan = static_cast(child_op); min_heaps[i] = phy_knn_scan->IsKnnMinHeap(); @@ -249,6 +251,11 @@ void PhysicalFusion::ExecuteRRFWeighted(const Map(child_op); + min_heaps[i] = phy_read_cache->is_min_heap(); + break; + } default: { String error_message = fmt::format("Cannot determine heap type of operator {}", int(child_op->operator_type())); UnrecoverableError(error_message); diff --git a/src/executor/operator/physical_read_cache.cpp b/src/executor/operator/physical_read_cache.cpp index 8c99fedc63..f6c1251f3c 100644 --- a/src/executor/operator/physical_read_cache.cpp +++ b/src/executor/operator/physical_read_cache.cpp @@ -22,6 +22,42 @@ import column_vector; namespace infinity { +PhysicalReadCache::PhysicalReadCache(u64 id, + LogicalNodeType origin_type, + SharedPtr base_table_ref, + SharedPtr cache_content, + Vector column_map, + SharedPtr> load_metas, + bool is_min_heap) + : PhysicalOperator(PhysicalOperatorType::kReadCache, nullptr, nullptr, id, load_metas), base_table_ref_(base_table_ref), + cache_content_(cache_content), column_map_(column_map), is_min_heap_(is_min_heap) { + switch (origin_type) { + case LogicalNodeType::kMatch: { + origin_type_ = PhysicalOperatorType::kMatch; + break; + } + case LogicalNodeType::kKnnScan: { + origin_type_ = PhysicalOperatorType::kKnnScan; + break; + } + case LogicalNodeType::kMatchSparseScan: { + origin_type_ = PhysicalOperatorType::kMatchSparseScan; + break; + } + case LogicalNodeType::kMatchTensorScan: { + origin_type_ = PhysicalOperatorType::kMatchTensorScan; + break; + } + case LogicalNodeType::kIndexScan: { + origin_type_ = PhysicalOperatorType::kIndexScan; + break; + } + default: { + UnrecoverableError("Not implemented"); + } + } +} + bool PhysicalReadCache::Execute(QueryContext *query_context, OperatorState *operator_state) { auto *read_cache_state = static_cast(operator_state); Vector> &data_block_array = read_cache_state->data_block_array_; diff --git a/src/executor/operator/physical_read_cache.cppm b/src/executor/operator/physical_read_cache.cppm index fbd7a79952..5af44326f2 100644 --- a/src/executor/operator/physical_read_cache.cppm +++ b/src/executor/operator/physical_read_cache.cppm @@ -22,18 +22,19 @@ import data_type; import result_cache_manager; import physical_operator_type; import load_meta; +import logical_node_type; namespace infinity { export class PhysicalReadCache : public PhysicalOperator { public: PhysicalReadCache(u64 id, + LogicalNodeType origin_type, SharedPtr base_table_ref, SharedPtr cache_content, Vector column_map, - SharedPtr> load_metas) - : PhysicalOperator(PhysicalOperatorType::kReadCache, nullptr, nullptr, id, load_metas), base_table_ref_(base_table_ref), - cache_content_(cache_content), column_map_(column_map) {} + SharedPtr> load_metas, + bool is_min_heap); void Init() override {}; @@ -45,14 +46,25 @@ public: SharedPtr>> GetOutputTypes() const override; + void FillingTableRefs(HashMap> &table_refs) override { + table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); + } + + const BaseTableRef *base_table_ref() const { return base_table_ref_.get(); } + PhysicalOperatorType origin_type() const { return origin_type_; } + + bool is_min_heap() const { return is_min_heap_; } + private: SharedPtr base_table_ref_; SharedPtr cache_content_; + PhysicalOperatorType origin_type_; Vector column_map_; // result column id -> cache column id + bool is_min_heap_; }; } // namespace infinity diff --git a/src/executor/operator/physical_scan/physical_filter_scan_base.cppm b/src/executor/operator/physical_scan/physical_filter_scan_base.cppm index 8fd1616b64..18c4793188 100644 --- a/src/executor/operator/physical_scan/physical_filter_scan_base.cppm +++ b/src/executor/operator/physical_scan/physical_filter_scan_base.cppm @@ -32,10 +32,12 @@ public: PhysicalOperatorType type, UniquePtr left, UniquePtr right, + u64 table_index, SharedPtr base_table_ref, SharedPtr common_query_filter, SharedPtr> load_metas) - : PhysicalScanBase(id, type, std::move(left), std::move(right), base_table_ref, load_metas), common_query_filter_(common_query_filter) {} + : PhysicalScanBase(id, type, std::move(left), std::move(right), table_index, base_table_ref, load_metas), + common_query_filter_(common_query_filter) {} bool CalculateFilterBitmask(SegmentID segment_id, BlockID block_id, BlockOffset row_count, Bitmask &bitmask) const; diff --git a/src/executor/operator/physical_scan/physical_index_scan.cpp b/src/executor/operator/physical_scan/physical_index_scan.cpp index 4a4db6fbc0..facb0d00a0 100644 --- a/src/executor/operator/physical_scan/physical_index_scan.cpp +++ b/src/executor/operator/physical_scan/physical_index_scan.cpp @@ -38,6 +38,7 @@ import fast_rough_filter; import roaring_bitmap; import filter_value_type_classification; import physical_scan_base; +import result_cache_manager; namespace infinity { @@ -49,8 +50,9 @@ PhysicalIndexScan::PhysicalIndexScan(const u64 id, SharedPtr> load_metas, SharedPtr> output_names, SharedPtr>> output_types, - const bool add_row_id) - : PhysicalScanBase(id, PhysicalOperatorType::kIndexScan, nullptr, nullptr, std::move(base_table_ref), std::move(load_metas)), + const bool add_row_id, + bool cache_result) + : PhysicalScanBase(id, PhysicalOperatorType::kIndexScan, nullptr, nullptr, 0, std::move(base_table_ref), std::move(load_metas), cache_result), index_filter_(std::move(index_filter)), index_filter_evaluator_(std::move(index_filter_evaluator)), fast_rough_filter_evaluator_(std::move(fast_rough_filter_evaluator)), output_names_(std::move(output_names)), output_types_(std::move(output_types)), add_row_id_(add_row_id) { @@ -182,6 +184,11 @@ void PhysicalIndexScan::ExecuteInternal(QueryContext *query_context, IndexScanOp // Finished index_scan_operator_state->SetComplete(); } + + ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager(); + if (cache_result_ && cache_mgr != nullptr) { + AddCache(query_context, cache_mgr, output_data_blocks); + } }; // check FastRoughFilter const auto &fast_rough_filter = *segment_entry->GetFastRoughFilter(); diff --git a/src/executor/operator/physical_scan/physical_index_scan.cppm b/src/executor/operator/physical_scan/physical_index_scan.cppm index 03dfe8e0c2..5deab52ee2 100644 --- a/src/executor/operator/physical_scan/physical_index_scan.cppm +++ b/src/executor/operator/physical_scan/physical_index_scan.cppm @@ -59,7 +59,8 @@ public: SharedPtr> load_metas, SharedPtr> output_names, SharedPtr>> output_types, - bool add_row_id = true); + bool add_row_id = true, + bool cache_result = false); ~PhysicalIndexScan() final = default; @@ -78,11 +79,6 @@ public: Vector>> PlanBlockEntries(i64) const override; - // for InputLoad - void FillingTableRefs(HashMap> &table_refs) override { - table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); - } - Vector>> PlanSegments(u32 parallel_count) const; inline String table_alias() const { return base_table_ref_->alias_; } diff --git a/src/executor/operator/physical_scan/physical_knn_scan.cppm b/src/executor/operator/physical_scan/physical_knn_scan.cppm index 2fee15848b..d2a424d70a 100644 --- a/src/executor/operator/physical_scan/physical_knn_scan.cppm +++ b/src/executor/operator/physical_scan/physical_knn_scan.cppm @@ -44,9 +44,15 @@ public: SharedPtr>> output_types, u64 knn_table_index, SharedPtr> load_metas) - : PhysicalFilterScanBase(id, PhysicalOperatorType::kKnnScan, nullptr, nullptr, base_table_ref, common_query_filter, load_metas), - knn_expression_(std::move(knn_expression)), output_names_(std::move(output_names)), output_types_(std::move(output_types)), - knn_table_index_(knn_table_index) {} + : PhysicalFilterScanBase(id, + PhysicalOperatorType::kKnnScan, + nullptr, + nullptr, + knn_table_index, + base_table_ref, + common_query_filter, + load_metas), + knn_expression_(std::move(knn_expression)), output_names_(std::move(output_names)), output_types_(std::move(output_types)) {} ~PhysicalKnnScan() override = default; @@ -72,10 +78,6 @@ public: SizeT TaskletCount() override; - void FillingTableRefs(HashMap> &table_refs) override { - table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); - } - inline bool IsKnnMinHeap() const { return knn_expression_->IsKnnMinHeap(); } private: @@ -91,7 +93,6 @@ public: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; - u64 knn_table_index_{}; Vector> block_parallel_options_; u32 block_column_entries_size_ = 0; // need this value because block_column_entries_ will be moved into KnnScanSharedData diff --git a/src/executor/operator/physical_scan/physical_match_sparse_scan.cpp b/src/executor/operator/physical_scan/physical_match_sparse_scan.cpp index 5cb68c12ea..b9d42c1a4e 100644 --- a/src/executor/operator/physical_scan/physical_match_sparse_scan.cpp +++ b/src/executor/operator/physical_scan/physical_match_sparse_scan.cpp @@ -70,8 +70,15 @@ PhysicalMatchSparseScan::PhysicalMatchSparseScan(u64 id, SharedPtr match_sparse_expression, const SharedPtr &common_query_filter, SharedPtr> load_metas) - : PhysicalFilterScanBase(id, PhysicalOperatorType::kMatchSparseScan, nullptr, nullptr, base_table_ref, common_query_filter, load_metas), - table_index_(table_index), match_sparse_expr_(std::move(match_sparse_expression)) {} + : PhysicalFilterScanBase(id, + PhysicalOperatorType::kMatchSparseScan, + nullptr, + nullptr, + table_index, + base_table_ref, + common_query_filter, + load_metas), + match_sparse_expr_(std::move(match_sparse_expression)) {} void PhysicalMatchSparseScan::Init() { search_column_id_ = match_sparse_expr_->column_expr_->binding().column_idx; } diff --git a/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm b/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm index 2883c0ee7c..3743b752df 100644 --- a/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm +++ b/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm @@ -59,8 +59,8 @@ public: Vector>> PlanWithIndex(Vector>> &block_groups, i64 parallel_count); - u64 table_index() const { - return table_index_; + SharedPtr match_sparse_expr() const { + return match_sparse_expr_; } private: @@ -84,7 +84,6 @@ private: ExecuteInnerT(DistFunc *dist_func, MergeHeap *merge_heap, QueryContext *query_context, MatchSparseScanOperatorState *match_sparse_scan_state); private: - u64 table_index_ = 0; SharedPtr match_sparse_expr_; // column to search diff --git a/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp b/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp index b21473fed6..f4e3a82eb1 100644 --- a/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp +++ b/src/executor/operator/physical_scan/physical_match_tensor_scan.cpp @@ -75,6 +75,7 @@ import logical_match_tensor_scan; import simd_functions; import knn_expression; import search_options; +import result_cache_manager; namespace infinity { @@ -89,10 +90,17 @@ PhysicalMatchTensorScan::PhysicalMatchTensorScan(u64 id, SharedPtr match_tensor_expression, const SharedPtr &common_query_filter, u32 topn, - const MatchTensorScanIndexOptions &index_options, + const SharedPtr &index_options, SharedPtr> load_metas) - : PhysicalFilterScanBase(id, PhysicalOperatorType::kMatchTensorScan, nullptr, nullptr, base_table_ref, common_query_filter, load_metas), - table_index_(table_index), src_match_tensor_expr_(std::move(match_tensor_expression)), topn_(topn), index_options_(index_options) { + : PhysicalFilterScanBase(id, + PhysicalOperatorType::kMatchTensorScan, + nullptr, + nullptr, + table_index, + base_table_ref, + common_query_filter, + load_metas), + src_match_tensor_expr_(std::move(match_tensor_expression)), topn_(topn), index_options_(index_options) { search_column_id_ = std::numeric_limits::max(); } @@ -297,11 +305,11 @@ void PhysicalMatchTensorScan::ExecuteInner(QueryContext *query_context, MatchTen segment_entry, block_index, begin_ts, - index_options_.emvb_centroid_nprobe_, - index_options_.emvb_threshold_first_, - index_options_.emvb_n_doc_to_score_, - index_options_.emvb_n_doc_out_second_stage_, - index_options_.emvb_threshold_final_); + index_options_->emvb_centroid_nprobe_, + index_options_->emvb_threshold_first_, + index_options_->emvb_n_doc_to_score_, + index_options_->emvb_n_doc_out_second_stage_, + index_options_->emvb_threshold_final_); std::visit(Overload{[segment_id, &function_data](const Tuple, UniquePtr> &index_result) { const auto &[result_num, score_ptr, row_id_ptr] = index_result; for (u32 i = 0; i < result_num; ++i) { @@ -354,11 +362,11 @@ void PhysicalMatchTensorScan::ExecuteInner(QueryContext *query_context, MatchTen segment_entry, block_index, begin_ts, - index_options_.emvb_centroid_nprobe_, - index_options_.emvb_threshold_first_, - index_options_.emvb_n_doc_to_score_, - index_options_.emvb_n_doc_out_second_stage_, - index_options_.emvb_threshold_final_); + index_options_->emvb_centroid_nprobe_, + index_options_->emvb_threshold_first_, + index_options_->emvb_n_doc_to_score_, + index_options_->emvb_n_doc_out_second_stage_, + index_options_->emvb_threshold_final_); for (u32 i = 0; i < result_num; ++i) { function_data.result_handler_->AddResult(0, score_ptr[i], RowID(segment_id, row_id_ptr[i])); } @@ -427,6 +435,11 @@ void PhysicalMatchTensorScan::ExecuteInner(QueryContext *query_context, MatchTen ++output_block_row_id; } output_block_ptr->Finalize(); + + ResultCacheManager *cache_mgr = query_context->storage()->result_cache_manager(); + if (cache_result_ && cache_mgr != nullptr) { + AddCache(query_context, cache_mgr, operator_state->data_block_array_); + } operator_state->SetComplete(); } } diff --git a/src/executor/operator/physical_scan/physical_match_tensor_scan.cppm b/src/executor/operator/physical_scan/physical_match_tensor_scan.cppm index fac99d8c9e..a03eed63db 100644 --- a/src/executor/operator/physical_scan/physical_match_tensor_scan.cppm +++ b/src/executor/operator/physical_scan/physical_match_tensor_scan.cppm @@ -44,7 +44,7 @@ public: SharedPtr match_tensor_expression, const SharedPtr &common_query_filter, u32 topn, - const MatchTensorScanIndexOptions &index_options, + const SharedPtr &index_options, SharedPtr> load_metas); void Init() override; @@ -65,24 +65,19 @@ public: SizeT TaskletCount() override; - void FillingTableRefs(HashMap> &table_refs) override { - table_refs.insert({base_table_ref_->table_index_, base_table_ref_}); - } - [[nodiscard]] inline String TableAlias() const { return base_table_ref_->alias_; } [[nodiscard]] inline TableEntry *table_collection_ptr() const { return base_table_ref_->table_entry_ptr_; } - [[nodiscard]] inline u64 table_index() const { return table_index_; } - - [[nodiscard]] inline MatchTensorExpression *match_tensor_expr() const { return src_match_tensor_expr_.get(); } + [[nodiscard]] inline const SharedPtr &match_tensor_expr() const { return src_match_tensor_expr_; } [[nodiscard]] inline const CommonQueryFilter *common_query_filter() const { return common_query_filter_.get(); } [[nodiscard]] inline u32 GetTopN() const { return topn_; } + const SharedPtr &index_options() const { return index_options_; } + private: - u64 table_index_ = 0; SharedPtr src_match_tensor_expr_; // real MatchTensorExpression used in calculation @@ -92,7 +87,7 @@ private: // extra options from match_tensor_expr_ u32 topn_ = 0; - MatchTensorScanIndexOptions index_options_; + SharedPtr index_options_; // column to search ColumnID search_column_id_ = 0; diff --git a/src/executor/operator/physical_scan/physical_merge_knn.cppm b/src/executor/operator/physical_scan/physical_merge_knn.cppm index ad3ae437cd..de95b0600c 100644 --- a/src/executor/operator/physical_scan/physical_merge_knn.cppm +++ b/src/executor/operator/physical_scan/physical_merge_knn.cppm @@ -43,11 +43,13 @@ public: SharedPtr> output_names, SharedPtr>> output_types, SharedPtr knn_expr, + SharedPtr filter_expression, u64 knn_table_index, - SharedPtr> load_metas) - : PhysicalScanBase(id, PhysicalOperatorType::kMergeKnn, std::move(left), nullptr, table_ref, load_metas), - output_names_(std::move(output_names)), output_types_(std::move(output_types)), knn_table_index_(knn_table_index), - knn_expression_(std::move(knn_expr)) {} + SharedPtr> load_metas, + bool cache_result) + : PhysicalScanBase(id, PhysicalOperatorType::kMergeKnn, std::move(left), nullptr, knn_table_index, table_ref, load_metas, cache_result), + output_names_(std::move(output_names)), output_types_(std::move(output_types)), knn_expression_(std::move(knn_expr)), + filter_expression_(std::move(filter_expression)) {} ~PhysicalMergeKnn() override = default; @@ -65,8 +67,6 @@ public: return 0; } - inline u64 knn_table_index() const { return knn_table_index_; } - inline bool IsKnnMinHeap() const { return knn_expression_->IsKnnMinHeap(); } private: @@ -76,10 +76,10 @@ private: private: SharedPtr> output_names_{}; SharedPtr>> output_types_{}; - u64 knn_table_index_{}; public: SharedPtr knn_expression_{}; + SharedPtr filter_expression_{}; }; } // namespace infinity diff --git a/src/executor/operator/physical_scan/physical_merge_match_sparse.cpp b/src/executor/operator/physical_scan/physical_merge_match_sparse.cpp index 2f9829b283..ed1a14b852 100644 --- a/src/executor/operator/physical_scan/physical_merge_match_sparse.cpp +++ b/src/executor/operator/physical_scan/physical_merge_match_sparse.cpp @@ -38,6 +38,7 @@ import logical_type; import knn_result_handler; import merge_knn; import match_sparse_scan_function_data; +import result_cache_manager; namespace infinity { @@ -46,9 +47,11 @@ PhysicalMergeMatchSparse::PhysicalMergeMatchSparse(u64 id, u64 table_index, SharedPtr base_table_ref, SharedPtr match_sparse_expr, - SharedPtr> load_metas) - : PhysicalScanBase(id, PhysicalOperatorType::kMergeMatchSparse, std::move(left), nullptr, base_table_ref, load_metas), table_index_(table_index), - match_sparse_expr_(std::move(match_sparse_expr)) {} + SharedPtr filter_expression, + SharedPtr> load_metas, + bool cache_result) + : PhysicalScanBase(id, PhysicalOperatorType::kMergeMatchSparse, std::move(left), nullptr, table_index, base_table_ref, load_metas, cache_result), + match_sparse_expr_(std::move(match_sparse_expr)), filter_expression_(std::move(filter_expression)) {} void PhysicalMergeMatchSparse::Init() { left_->Init(); } diff --git a/src/executor/operator/physical_scan/physical_merge_match_sparse.cppm b/src/executor/operator/physical_scan/physical_merge_match_sparse.cppm index d40463d0e3..e42abd6079 100644 --- a/src/executor/operator/physical_scan/physical_merge_match_sparse.cppm +++ b/src/executor/operator/physical_scan/physical_merge_match_sparse.cppm @@ -26,6 +26,7 @@ import base_table_ref; import data_type; import physical_scan_base; import match_sparse_expr; +import base_expression; namespace infinity { struct LoadMeta; @@ -37,7 +38,9 @@ public: u64 table_index, SharedPtr base_table_ref, SharedPtr match_sparse_expr, - SharedPtr> load_metas); + SharedPtr filter_expression, + SharedPtr> load_metas, + bool cache_result); void Init() override; @@ -49,6 +52,12 @@ public: SizeT TaskletCount() override; + const SharedPtr &match_sparse_expr() const { return match_sparse_expr_; } + + const SharedPtr &filter_expression() const { return filter_expression_; } + + i64 GetTopN() const { return match_sparse_expr_->topn_; } + private: template