From 6bed85db99b8084244fb9ecff3eaea148f9bbc9b Mon Sep 17 00:00:00 2001 From: jacktengg Date: Thu, 5 Dec 2024 17:37:01 +0800 Subject: [PATCH] removed deprecated code --- .../exec/partitioned_aggregation_sink_operator.cpp | 10 +--------- .../exec/partitioned_aggregation_sink_operator.h | 3 --- be/src/pipeline/exec/spill_sort_sink_operator.cpp | 9 +-------- be/src/pipeline/exec/spill_sort_sink_operator.h | 2 -- 4 files changed, 2 insertions(+), 22 deletions(-) diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp index bc3c1fccba552d..88d39eb93dfdb5 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.cpp @@ -34,11 +34,7 @@ namespace doris::pipeline { PartitionedAggSinkLocalState::PartitionedAggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state) { - _finish_dependency = - std::make_shared(parent->operator_id(), parent->node_id(), - parent->get_name() + "_FINISH_DEPENDENCY", true); -} + : Base(parent, state) {} Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { @@ -69,7 +65,6 @@ Status PartitionedAggSinkLocalState::init(doris::RuntimeState* state, "AggSinkSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); - _finish_dependency->block(); return Status::OK(); } @@ -196,11 +191,9 @@ Status PartitionedAggSinkOperatorX::sink(doris::RuntimeState* state, vectorized: RETURN_IF_ERROR(partition->finish_current_spilling(eos)); } local_state._dependency->set_ready_to_read(); - local_state._finish_dependency->set_ready(); } } else { local_state._dependency->set_ready_to_read(); - local_state._finish_dependency->set_ready(); } } else if (local_state._shared_state->is_spilled) { if (revocable_mem_size(state) >= vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM) { @@ -331,7 +324,6 @@ Status PartitionedAggSinkLocalState::revoke_memory( if (_eos) { Base::_dependency->set_ready_to_read(); - _finish_dependency->set_ready(); } state->get_query_ctx()->decrease_revoking_tasks_count(); }}; diff --git a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h index 2c77ed15436826..4b022dd1c5ab69 100644 --- a/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_aggregation_sink_operator.h @@ -44,7 +44,6 @@ class PartitionedAggSinkLocalState Status init(RuntimeState* state, LocalSinkStateInfo& info) override; Status open(RuntimeState* state) override; Status close(RuntimeState* state, Status exec_status) override; - Dependency* finishdependency() override { return _finish_dependency.get(); } Status revoke_memory(RuntimeState* state, const std::shared_ptr& spill_context); @@ -271,8 +270,6 @@ class PartitionedAggSinkLocalState std::unique_ptr _runtime_state; - std::shared_ptr _finish_dependency; - // temp structures during spilling vectorized::MutableColumns key_columns_; vectorized::MutableColumns value_columns_; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.cpp b/be/src/pipeline/exec/spill_sort_sink_operator.cpp index 00d05802e01f6e..0dfc157ad33f72 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.cpp +++ b/be/src/pipeline/exec/spill_sort_sink_operator.cpp @@ -26,10 +26,7 @@ namespace doris::pipeline { SpillSortSinkLocalState::SpillSortSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) - : Base(parent, state) { - _finish_dependency = std::make_shared(parent->operator_id(), parent->node_id(), - parent->get_name() + "_SPILL_DEPENDENCY"); -} + : Base(parent, state) {} Status SpillSortSinkLocalState::init(doris::RuntimeState* state, doris::pipeline::LocalSinkStateInfo& info) { @@ -46,7 +43,6 @@ Status SpillSortSinkLocalState::init(doris::RuntimeState* state, RETURN_IF_ERROR(setup_in_memory_sort_op(state)); Base::_shared_state->in_mem_shared_state->sorter->set_enable_spill(); - _finish_dependency->block(); return Status::OK(); } @@ -181,13 +177,11 @@ Status SpillSortSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Bloc RETURN_IF_ERROR(revoke_memory(state, nullptr)); } else { local_state._dependency->set_ready_to_read(); - local_state._finish_dependency->set_ready(); } } else { RETURN_IF_ERROR( local_state._shared_state->in_mem_shared_state->sorter->prepare_for_read()); local_state._dependency->set_ready_to_read(); - local_state._finish_dependency->set_ready(); } } return Status::OK(); @@ -251,7 +245,6 @@ Status SpillSortSinkLocalState::revoke_memory(RuntimeState* state, state->get_query_ctx()->decrease_revoking_tasks_count(); if (_eos) { _dependency->set_ready_to_read(); - _finish_dependency->set_ready(); } }}; diff --git a/be/src/pipeline/exec/spill_sort_sink_operator.h b/be/src/pipeline/exec/spill_sort_sink_operator.h index 8984b1e43de891..3d6ccdcc4ce359 100644 --- a/be/src/pipeline/exec/spill_sort_sink_operator.h +++ b/be/src/pipeline/exec/spill_sort_sink_operator.h @@ -38,7 +38,6 @@ class SpillSortSinkLocalState : public PipelineXSpillSinkLocalState _finish_dependency; }; class SpillSortSinkOperatorX final : public DataSinkOperatorX {