diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp b/be/src/pipeline/exec/aggregation_sink_operator.cpp index dfde3a02998923..c2bb041abb268d 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp @@ -82,6 +82,9 @@ Status AggSinkLocalState::init(RuntimeState* state, Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer, _merge_timer, _expr_timer); } + if (p._is_streaming) { + Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0); + } Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) { RETURN_IF_ERROR( @@ -717,7 +720,7 @@ Status AggSinkLocalState::try_spill_disk(bool eos) { template AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs) + const DescriptorTbl& descs, bool is_streaming) : DataSinkOperatorX(operator_id, tnode.node_id), _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), _intermediate_tuple_desc(nullptr), @@ -727,7 +730,8 @@ AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operato _is_merge(false), _pool(pool), _limit(tnode.limit), - _have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) { + _have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()), + _is_streaming(is_streaming) { _is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase; } diff --git a/be/src/pipeline/exec/aggregation_sink_operator.h b/be/src/pipeline/exec/aggregation_sink_operator.h index 2d16df2be258ea..ec4bbe3bc709ef 100644 --- a/be/src/pipeline/exec/aggregation_sink_operator.h +++ b/be/src/pipeline/exec/aggregation_sink_operator.h @@ -351,7 +351,7 @@ template class AggSinkOperatorX : public DataSinkOperatorX { public: AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, - const DescriptorTbl& descs); + const DescriptorTbl& descs, bool is_streaming = false); ~AggSinkOperatorX() override = default; Status init(const TDataSink& tsink) override { return Status::InternalError("{} should not init with TPlanNode", @@ -404,6 +404,7 @@ class AggSinkOperatorX : public DataSinkOperatorX { size_t _spill_partition_count_bits; int64_t _limit; // -1: no limit bool _have_conjuncts; + const bool _is_streaming; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/aggregation_source_operator.cpp b/be/src/pipeline/exec/aggregation_source_operator.cpp index dc09e8a268c2fc..6f0c071d3911e8 100644 --- a/be/src/pipeline/exec/aggregation_source_operator.cpp +++ b/be/src/pipeline/exec/aggregation_source_operator.cpp @@ -51,8 +51,7 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) { auto& p = _parent->template cast(); if (p._is_streaming) { _shared_state->data_queue.reset(new DataQueue(1)); - _shared_state->data_queue->set_dependency(_dependency, - info.upstream_dependencies.front().get()); + _shared_state->data_queue->set_source_dependency(_dependency); } if (p._without_key) { if (p._needs_finalize) { diff --git a/be/src/pipeline/exec/data_queue.cpp b/be/src/pipeline/exec/data_queue.cpp index 656db9cbe73061..680589ce2b1ad7 100644 --- a/be/src/pipeline/exec/data_queue.cpp +++ b/be/src/pipeline/exec/data_queue.cpp @@ -40,9 +40,7 @@ DataQueue::DataQueue(int child_count) _is_canceled(child_count), _cur_bytes_in_queue(child_count), _cur_blocks_nums_in_queue(child_count), - _flag_queue_idx(0), - _source_dependency(nullptr), - _sink_dependency(nullptr) { + _flag_queue_idx(0) { for (int i = 0; i < child_count; ++i) { _queue_blocks_lock[i].reset(new std::mutex()); _free_blocks_lock[i].reset(new std::mutex()); @@ -51,6 +49,8 @@ DataQueue::DataQueue(int child_count) _cur_bytes_in_queue[i] = 0; _cur_blocks_nums_in_queue[i] = 0; } + _un_finished_counter = child_count; + _sink_dependencies.resize(child_count, nullptr); } std::unique_ptr DataQueue::get_free_block(int child_idx) { @@ -118,11 +118,12 @@ Status DataQueue::get_block_from_queue(std::unique_ptr* outpu } _cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes(); _cur_blocks_nums_in_queue[_flag_queue_idx] -= 1; - if (_sink_dependency) { - if (!_is_finished[_flag_queue_idx]) { + auto old_value = _cur_blocks_total_nums.fetch_sub(1); + if (old_value == 1 && _source_dependency) { + if (!is_all_finish()) { _source_dependency->block(); } - _sink_dependency->set_ready(); + _sink_dependencies[_flag_queue_idx]->set_ready(); } } else { if (_is_finished[_flag_queue_idx]) { @@ -142,9 +143,10 @@ void DataQueue::push_block(std::unique_ptr block, int child_i _cur_bytes_in_queue[child_idx] += block->allocated_bytes(); _queue_blocks[child_idx].emplace_back(std::move(block)); _cur_blocks_nums_in_queue[child_idx] += 1; - if (_sink_dependency) { + _cur_blocks_total_nums++; + if (_source_dependency) { _source_dependency->set_ready(); - _sink_dependency->block(); + _sink_dependencies[child_idx]->block(); } //this only use to record the queue[0] for profile _max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue[0].load()); @@ -154,10 +156,16 @@ void DataQueue::push_block(std::unique_ptr block, int child_i void DataQueue::set_finish(int child_idx) { std::lock_guard l(*_queue_blocks_lock[child_idx]); + if (_is_finished[child_idx]) { + return; + } _is_finished[child_idx] = true; if (_source_dependency) { _source_dependency->set_ready(); } + if (_un_finished_counter.fetch_sub(1) == 1) { + _is_all_finished = true; + } } void DataQueue::set_canceled(int child_idx) { @@ -175,12 +183,7 @@ bool DataQueue::is_finish(int child_idx) { } bool DataQueue::is_all_finish() { - for (int i = 0; i < _child_count; ++i) { - if (_is_finished[i] == false) { - return false; - } - } - return true; + return _is_all_finished; } } // namespace pipeline diff --git a/be/src/pipeline/exec/data_queue.h b/be/src/pipeline/exec/data_queue.h index f756ca7e621a13..d28fe5d8f012f8 100644 --- a/be/src/pipeline/exec/data_queue.h +++ b/be/src/pipeline/exec/data_queue.h @@ -60,9 +60,11 @@ class DataQueue { int64_t max_size_of_queue() const { return _max_size_of_queue; } bool data_exhausted() const { return _data_exhausted; } - void set_dependency(Dependency* source_dependency, Dependency* sink_dependency) { + void set_source_dependency(Dependency* source_dependency) { _source_dependency = source_dependency; - _sink_dependency = sink_dependency; + } + void set_sink_dependency(Dependency* sink_dependency, int child_idx) { + _sink_dependencies[child_idx] = sink_dependency; } private: @@ -80,10 +82,13 @@ class DataQueue { //how many deque will be init, always will be one int _child_count = 0; std::vector _is_finished; + std::atomic_uint32_t _un_finished_counter; + std::atomic_bool _is_all_finished = false; std::vector _is_canceled; // int64_t just for counter of profile std::vector _cur_bytes_in_queue; std::vector _cur_blocks_nums_in_queue; + std::atomic_uint32_t _cur_blocks_total_nums = 0; //this will be indicate which queue has data, it's useful when have many queues std::atomic_int _flag_queue_idx = 0; @@ -95,8 +100,9 @@ class DataQueue { int64_t _max_size_of_queue = 0; static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10; + // data queue is multi sink one source Dependency* _source_dependency = nullptr; - Dependency* _sink_dependency = nullptr; + std::vector _sink_dependencies; }; } // namespace pipeline diff --git a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp index 74cf655c3669c7..da4968025af81e 100644 --- a/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/distinct_streaming_aggregation_sink_operator.cpp @@ -183,7 +183,8 @@ DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool* int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) - : AggSinkOperatorX(pool, operator_id, tnode, descs) {} + : AggSinkOperatorX(pool, operator_id, tnode, descs, + true) {} Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp index 17eb1b2db1ceec..2cc8b9efcaf22c 100644 --- a/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp +++ b/be/src/pipeline/exec/streaming_aggregation_sink_operator.cpp @@ -341,7 +341,7 @@ Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key( StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, const DescriptorTbl& descs) - : AggSinkOperatorX(pool, operator_id, tnode, descs) {} + : AggSinkOperatorX(pool, operator_id, tnode, descs, true) {} Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { RETURN_IF_ERROR(AggSinkOperatorX::init(tnode, state)); diff --git a/be/src/pipeline/exec/union_sink_operator.cpp b/be/src/pipeline/exec/union_sink_operator.cpp index 2a235123bdaf15..1ce3ef5c217590 100644 --- a/be/src/pipeline/exec/union_sink_operator.cpp +++ b/be/src/pipeline/exec/union_sink_operator.cpp @@ -99,6 +99,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) SCOPED_TIMER(_open_timer); auto& p = _parent->cast(); _child_expr.resize(p._child_expr.size()); + _shared_state->data_queue.set_sink_dependency(_dependency, p._cur_child_id); for (size_t i = 0; i < p._child_expr.size(); i++) { RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i])); } diff --git a/be/src/pipeline/exec/union_source_operator.cpp b/be/src/pipeline/exec/union_source_operator.cpp index d824f9db7a0352..619b40f777b5cd 100644 --- a/be/src/pipeline/exec/union_source_operator.cpp +++ b/be/src/pipeline/exec/union_source_operator.cpp @@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { ((UnionSourceDependency*)deps.front().get())->set_shared_state(ss); } RETURN_IF_ERROR(Base::init(state, info)); - ss->data_queue.set_dependency(_dependency, info.upstream_dependencies.front().get()); + ss->data_queue.set_source_dependency(_dependency); SCOPED_TIMER(exec_time_counter()); SCOPED_TIMER(_open_timer); // Const exprs materialized by this node. These exprs don't refer to any children. @@ -141,6 +141,9 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) { RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list)); } } + if (child_count == 0) { + _dependency->set_ready(); + } return Status::OK(); } diff --git a/be/src/pipeline/exec/union_source_operator.h b/be/src/pipeline/exec/union_source_operator.h index 8b7060884e905c..c39ea3dbc43be2 100644 --- a/be/src/pipeline/exec/union_source_operator.h +++ b/be/src/pipeline/exec/union_source_operator.h @@ -75,19 +75,6 @@ class UnionSourceDependency final : public Dependency { UnionSourceDependency(int id, int node_id, QueryContext* query_ctx) : Dependency(id, node_id, "UnionSourceDependency", query_ctx) {} ~UnionSourceDependency() override = default; - - [[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override { - if (((UnionSharedState*)_shared_state.get())->child_count() == 0) { - return nullptr; - } - if (((UnionSharedState*)_shared_state.get())->data_queue.is_all_finish() || - ((UnionSharedState*)_shared_state.get())->data_queue.remaining_has_data()) { - return nullptr; - } - return this; - } - bool push_to_blocking_queue() const override { return true; } - void block() override {} }; class UnionSourceOperatorX;