From 0b1ce6026f987a1e897dbc2c2560eb9f9ee3be5d Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Sun, 15 Dec 2024 11:31:34 +0800 Subject: [PATCH] Fix 'physical explain' of merge limit Signed-off-by: Jin Hai --- python/test_pysdk/test_py_import.py | 6 ++++ src/executor/explain_physical_plan.cpp | 30 +++++++++++++++++++ .../operator/physical_merge_limit.cppm | 4 +++ src/executor/physical_planner.cpp | 17 ++++++----- 4 files changed, 49 insertions(+), 8 deletions(-) diff --git a/python/test_pysdk/test_py_import.py b/python/test_pysdk/test_py_import.py index e40b5fc0b3..982635812f 100644 --- a/python/test_pysdk/test_py_import.py +++ b/python/test_pysdk/test_py_import.py @@ -420,6 +420,12 @@ def test_import_with_different_size(self, check_data, data_size, suffix): {"c1": {"type": "int"}, "c2": {"type": "varchar"}}) test_csv_dir = common_values.TEST_TMP_DIR + "pysdk_test_import_with_different_size.csv" + + with open(test_csv_dir, 'r') as file: + row_count = sum(1 for line in file) + + assert row_count == data_size + res = table_obj.import_data(test_csv_dir) assert res.error_code == ErrorCode.OK diff --git a/src/executor/explain_physical_plan.cpp b/src/executor/explain_physical_plan.cpp index 33918d195c..972de6699c 100644 --- a/src/executor/explain_physical_plan.cpp +++ b/src/executor/explain_physical_plan.cpp @@ -239,18 +239,22 @@ void ExplainPhysicalPlan::Explain(const PhysicalOperator *op, SharedPtrnode_id()) + ")"; result->emplace_back(MakeShared(explain_header_str)); + + { + String limit_value_str = String(intent_size, ' ') + " - limit: "; + ExplainLogicalPlan::Explain(merge_limit_node->limit_expr().get(), limit_value_str); + result->emplace_back(MakeShared(limit_value_str)); + } + + if (merge_limit_node->offset_expr().get() != 0) { + String offset_value_str = String(intent_size, ' ') + " - offset: "; + ExplainLogicalPlan::Explain(merge_limit_node->offset_expr().get(), offset_value_str); + result->emplace_back(MakeShared(offset_value_str)); + } + + // Output column + { + String output_columns_str = String(intent_size, ' ') + " - output columns: ["; + SharedPtr> output_columns = merge_limit_node->GetOutputNames(); + SizeT column_count = output_columns->size(); + for (SizeT idx = 0; idx < column_count - 1; ++idx) { + output_columns_str += output_columns->at(idx) + ", "; + } + output_columns_str += output_columns->back() + "]"; + result->emplace_back(MakeShared(output_columns_str)); + } } void ExplainPhysicalPlan::Explain(const PhysicalMergeTop *merge_top_node, SharedPtr>> &result, i64 intent_size) { diff --git a/src/executor/operator/physical_merge_limit.cppm b/src/executor/operator/physical_merge_limit.cppm index 122310fc1a..4149d26e21 100644 --- a/src/executor/operator/physical_merge_limit.cppm +++ b/src/executor/operator/physical_merge_limit.cppm @@ -56,6 +56,10 @@ public: return 0; } + [[nodiscard]] inline const SharedPtr &limit_expr() const { return limit_expr_; } + + [[nodiscard]] inline const SharedPtr &offset_expr() const { return offset_expr_; } + private: SharedPtr limit_expr_{}; SharedPtr offset_expr_{}; diff --git a/src/executor/physical_planner.cpp b/src/executor/physical_planner.cpp index 07abfa78ba..29665e41f6 100644 --- a/src/executor/physical_planner.cpp +++ b/src/executor/physical_planner.cpp @@ -747,7 +747,13 @@ UniquePtr PhysicalPlanner::BuildLimit(const SharedPtr logical_limit = static_pointer_cast(logical_operator); UniquePtr input_physical_operator = BuildPhysicalOperator(input_logical_node); - if (input_physical_operator->TaskletCount() <= 1) { + + i64 offset = 0; + if (logical_limit->offset_expression_.get() != nullptr) { + offset = (static_pointer_cast(logical_limit->offset_expression_))->GetValue().value_.big_int; + } + + if (input_physical_operator->TaskletCount() <= 1 or offset != 0) { return MakeUnique(logical_operator->node_id(), std::move(input_physical_operator), logical_limit->limit_expression_, @@ -755,21 +761,16 @@ UniquePtr PhysicalPlanner::BuildLimit(const SharedPtrload_metas(), logical_limit->total_hits_count_flag_); } else { - i64 child_limit = (static_pointer_cast(logical_limit->limit_expression_))->GetValue().value_.big_int; - - if (logical_limit->offset_expression_.get() != nullptr) { - child_limit += (static_pointer_cast(logical_limit->offset_expression_))->GetValue().value_.big_int; - } auto child_limit_op = MakeUnique(logical_operator->node_id(), std::move(input_physical_operator), - MakeShared(Value::MakeBigInt(child_limit)), + logical_limit->limit_expression_, nullptr, logical_operator->load_metas(), logical_limit->total_hits_count_flag_); return MakeUnique(query_context_ptr_->GetNextNodeID(), std::move(child_limit_op), logical_limit->limit_expression_, - logical_limit->offset_expression_, + nullptr, MakeShared>()); } }