Skip to content

Commit

Permalink
Fix 'physical explain' of merge limit (#2372)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Fix explain result isn't correct.

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Dec 15, 2024
1 parent 5ecf1f5 commit ca91e90
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 8 deletions.
6 changes: 6 additions & 0 deletions python/test_pysdk/test_py_import.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,12 @@ def test_import_with_different_size(self, check_data, data_size, suffix):
{"c1": {"type": "int"}, "c2": {"type": "varchar"}})

test_csv_dir = common_values.TEST_TMP_DIR + "pysdk_test_import_with_different_size.csv"

with open(test_csv_dir, 'r') as file:
row_count = sum(1 for line in file)

assert row_count == data_size

res = table_obj.import_data(test_csv_dir)
assert res.error_code == ErrorCode.OK

Expand Down
30 changes: 30 additions & 0 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -239,18 +239,22 @@ void ExplainPhysicalPlan::Explain(const PhysicalOperator *op, SharedPtr<Vector<S
break;
}
case PhysicalOperatorType::kSource: {
// Explain((PhysicalSource *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kSink: {
// Explain((PhysicalSink *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kInvalid: {
break;
}
case PhysicalOperatorType::kParallelAggregate: {
Explain((PhysicalParallelAggregate *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kMergeParallelAggregate: {
Explain((PhysicalMergeParallelAggregate *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kIntersect: {
Expand All @@ -266,13 +270,15 @@ void ExplainPhysicalPlan::Explain(const PhysicalOperator *op, SharedPtr<Vector<S
break;
}
case PhysicalOperatorType::kMergeLimit: {
Explain((PhysicalMergeLimit *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kMergeTop: {
Explain((PhysicalMergeTop *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kMergeSort: {
Explain((PhysicalMergeSort *)op, result, intent_size);
break;
}
case PhysicalOperatorType::kMergeKnn: {
Expand Down Expand Up @@ -2282,6 +2288,30 @@ void ExplainPhysicalPlan::Explain(const PhysicalMergeLimit *merge_limit_node,
}
explain_header_str += "(" + std::to_string(merge_limit_node->node_id()) + ")";
result->emplace_back(MakeShared<String>(explain_header_str));

{
String limit_value_str = String(intent_size, ' ') + " - limit: ";
ExplainLogicalPlan::Explain(merge_limit_node->limit_expr().get(), limit_value_str);
result->emplace_back(MakeShared<String>(limit_value_str));
}

if (merge_limit_node->offset_expr().get() != 0) {
String offset_value_str = String(intent_size, ' ') + " - offset: ";
ExplainLogicalPlan::Explain(merge_limit_node->offset_expr().get(), offset_value_str);
result->emplace_back(MakeShared<String>(offset_value_str));
}

// Output column
{
String output_columns_str = String(intent_size, ' ') + " - output columns: [";
SharedPtr<Vector<String>> output_columns = merge_limit_node->GetOutputNames();
SizeT column_count = output_columns->size();
for (SizeT idx = 0; idx < column_count - 1; ++idx) {
output_columns_str += output_columns->at(idx) + ", ";
}
output_columns_str += output_columns->back() + "]";
result->emplace_back(MakeShared<String>(output_columns_str));
}
}

void ExplainPhysicalPlan::Explain(const PhysicalMergeTop *merge_top_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
Expand Down
4 changes: 4 additions & 0 deletions src/executor/operator/physical_merge_limit.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public:
return 0;
}

[[nodiscard]] inline const SharedPtr<BaseExpression> &limit_expr() const { return limit_expr_; }

[[nodiscard]] inline const SharedPtr<BaseExpression> &offset_expr() const { return offset_expr_; }

private:
SharedPtr<BaseExpression> limit_expr_{};
SharedPtr<BaseExpression> offset_expr_{};
Expand Down
17 changes: 9 additions & 8 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -747,29 +747,30 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildLimit(const SharedPtr<LogicalN

SharedPtr<LogicalLimit> logical_limit = static_pointer_cast<LogicalLimit>(logical_operator);
UniquePtr<PhysicalOperator> input_physical_operator = BuildPhysicalOperator(input_logical_node);
if (input_physical_operator->TaskletCount() <= 1) {

i64 offset = 0;
if (logical_limit->offset_expression_.get() != nullptr) {
offset = (static_pointer_cast<ValueExpression>(logical_limit->offset_expression_))->GetValue().value_.big_int;
}

if (input_physical_operator->TaskletCount() <= 1 or offset != 0) {
return MakeUnique<PhysicalLimit>(logical_operator->node_id(),
std::move(input_physical_operator),
logical_limit->limit_expression_,
logical_limit->offset_expression_,
logical_operator->load_metas(),
logical_limit->total_hits_count_flag_);
} else {
i64 child_limit = (static_pointer_cast<ValueExpression>(logical_limit->limit_expression_))->GetValue().value_.big_int;

if (logical_limit->offset_expression_.get() != nullptr) {
child_limit += (static_pointer_cast<ValueExpression>(logical_limit->offset_expression_))->GetValue().value_.big_int;
}
auto child_limit_op = MakeUnique<PhysicalLimit>(logical_operator->node_id(),
std::move(input_physical_operator),
MakeShared<ValueExpression>(Value::MakeBigInt(child_limit)),
logical_limit->limit_expression_,
nullptr,
logical_operator->load_metas(),
logical_limit->total_hits_count_flag_);
return MakeUnique<PhysicalMergeLimit>(query_context_ptr_->GetNextNodeID(),
std::move(child_limit_op),
logical_limit->limit_expression_,
logical_limit->offset_expression_,
nullptr,
MakeShared<Vector<LoadMeta>>());
}
}
Expand Down

0 comments on commit ca91e90

Please sign in to comment.