Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor](pipelineX) refine union dependency #27348

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ Status AggSinkLocalState<DependencyType, Derived>::init(RuntimeState* state,
Base::_shared_state->aggregate_evaluators.back()->set_timer(_exec_timer, _merge_timer,
_expr_timer);
}
if (p._is_streaming) {
Base::_shared_state->data_queue->set_sink_dependency(Base::_dependency, 0);
}
Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size());
for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(
Expand Down Expand Up @@ -717,7 +720,7 @@ Status AggSinkLocalState<DependencyType, Derived>::try_spill_disk(bool eos) {
template <typename LocalStateType>
AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
const DescriptorTbl& descs, bool is_streaming)
: DataSinkOperatorX<LocalStateType>(operator_id, tnode.node_id),
_intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id),
_intermediate_tuple_desc(nullptr),
Expand All @@ -727,7 +730,8 @@ AggSinkOperatorX<LocalStateType>::AggSinkOperatorX(ObjectPool* pool, int operato
_is_merge(false),
_pool(pool),
_limit(tnode.limit),
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) {
_have_conjuncts(tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()),
_is_streaming(is_streaming) {
_is_first_phase = tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase;
}

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/aggregation_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,7 @@ template <typename LocalStateType = BlockingAggSinkLocalState>
class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
public:
AggSinkOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode,
const DescriptorTbl& descs);
const DescriptorTbl& descs, bool is_streaming = false);
~AggSinkOperatorX() override = default;
Status init(const TDataSink& tsink) override {
return Status::InternalError("{} should not init with TPlanNode",
Expand Down Expand Up @@ -404,6 +404,7 @@ class AggSinkOperatorX : public DataSinkOperatorX<LocalStateType> {
size_t _spill_partition_count_bits;
int64_t _limit; // -1: no limit
bool _have_conjuncts;
const bool _is_streaming;
};

} // namespace pipeline
Expand Down
3 changes: 1 addition & 2 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ Status AggLocalState::init(RuntimeState* state, LocalStateInfo& info) {
auto& p = _parent->template cast<AggSourceOperatorX>();
if (p._is_streaming) {
_shared_state->data_queue.reset(new DataQueue(1));
_shared_state->data_queue->set_dependency(_dependency,
info.upstream_dependencies.front().get());
_shared_state->data_queue->set_source_dependency(_dependency);
}
if (p._without_key) {
if (p._needs_finalize) {
Expand Down
31 changes: 17 additions & 14 deletions be/src/pipeline/exec/data_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ DataQueue::DataQueue(int child_count)
_is_canceled(child_count),
_cur_bytes_in_queue(child_count),
_cur_blocks_nums_in_queue(child_count),
_flag_queue_idx(0),
_source_dependency(nullptr),
_sink_dependency(nullptr) {
_flag_queue_idx(0) {
for (int i = 0; i < child_count; ++i) {
_queue_blocks_lock[i].reset(new std::mutex());
_free_blocks_lock[i].reset(new std::mutex());
Expand All @@ -51,6 +49,8 @@ DataQueue::DataQueue(int child_count)
_cur_bytes_in_queue[i] = 0;
_cur_blocks_nums_in_queue[i] = 0;
}
_un_finished_counter = child_count;
_sink_dependencies.resize(child_count, nullptr);
}

std::unique_ptr<vectorized::Block> DataQueue::get_free_block(int child_idx) {
Expand Down Expand Up @@ -118,11 +118,12 @@ Status DataQueue::get_block_from_queue(std::unique_ptr<vectorized::Block>* outpu
}
_cur_bytes_in_queue[_flag_queue_idx] -= (*output_block)->allocated_bytes();
_cur_blocks_nums_in_queue[_flag_queue_idx] -= 1;
if (_sink_dependency) {
if (!_is_finished[_flag_queue_idx]) {
auto old_value = _cur_blocks_total_nums.fetch_sub(1);
if (old_value == 1 && _source_dependency) {
if (!is_all_finish()) {
_source_dependency->block();
}
_sink_dependency->set_ready();
_sink_dependencies[_flag_queue_idx]->set_ready();
}
} else {
if (_is_finished[_flag_queue_idx]) {
Expand All @@ -142,9 +143,10 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i
_cur_bytes_in_queue[child_idx] += block->allocated_bytes();
_queue_blocks[child_idx].emplace_back(std::move(block));
_cur_blocks_nums_in_queue[child_idx] += 1;
if (_sink_dependency) {
_cur_blocks_total_nums++;
if (_source_dependency) {
_source_dependency->set_ready();
_sink_dependency->block();
_sink_dependencies[child_idx]->block();
}
//this only use to record the queue[0] for profile
_max_bytes_in_queue = std::max(_max_bytes_in_queue, _cur_bytes_in_queue[0].load());
Expand All @@ -154,10 +156,16 @@ void DataQueue::push_block(std::unique_ptr<vectorized::Block> block, int child_i

void DataQueue::set_finish(int child_idx) {
std::lock_guard<std::mutex> l(*_queue_blocks_lock[child_idx]);
if (_is_finished[child_idx]) {
return;
}
_is_finished[child_idx] = true;
if (_source_dependency) {
_source_dependency->set_ready();
}
if (_un_finished_counter.fetch_sub(1) == 1) {
_is_all_finished = true;
}
}

void DataQueue::set_canceled(int child_idx) {
Expand All @@ -175,12 +183,7 @@ bool DataQueue::is_finish(int child_idx) {
}

bool DataQueue::is_all_finish() {
for (int i = 0; i < _child_count; ++i) {
if (_is_finished[i] == false) {
return false;
}
}
return true;
return _is_all_finished;
}

} // namespace pipeline
Expand Down
12 changes: 9 additions & 3 deletions be/src/pipeline/exec/data_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,11 @@ class DataQueue {
int64_t max_size_of_queue() const { return _max_size_of_queue; }

bool data_exhausted() const { return _data_exhausted; }
void set_dependency(Dependency* source_dependency, Dependency* sink_dependency) {
void set_source_dependency(Dependency* source_dependency) {
_source_dependency = source_dependency;
_sink_dependency = sink_dependency;
}
void set_sink_dependency(Dependency* sink_dependency, int child_idx) {
_sink_dependencies[child_idx] = sink_dependency;
}

private:
Expand All @@ -80,10 +82,13 @@ class DataQueue {
//how many deque will be init, always will be one
int _child_count = 0;
std::vector<std::atomic_bool> _is_finished;
std::atomic_uint32_t _un_finished_counter;
std::atomic_bool _is_all_finished = false;
std::vector<std::atomic_bool> _is_canceled;
// int64_t just for counter of profile
std::vector<std::atomic_int64_t> _cur_bytes_in_queue;
std::vector<std::atomic_uint32_t> _cur_blocks_nums_in_queue;
std::atomic_uint32_t _cur_blocks_total_nums = 0;

//this will be indicate which queue has data, it's useful when have many queues
std::atomic_int _flag_queue_idx = 0;
Expand All @@ -95,8 +100,9 @@ class DataQueue {
int64_t _max_size_of_queue = 0;
static constexpr int64_t MAX_BYTE_OF_QUEUE = 1024l * 1024 * 1024 / 10;

// data queue is multi sink one source
Dependency* _source_dependency = nullptr;
Dependency* _sink_dependency = nullptr;
std::vector<Dependency*> _sink_dependencies;
};

} // namespace pipeline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ DistinctStreamingAggSinkOperatorX::DistinctStreamingAggSinkOperatorX(ObjectPool*
int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, operator_id, tnode, descs) {}
: AggSinkOperatorX<DistinctStreamingAggSinkLocalState>(pool, operator_id, tnode, descs,
true) {}

Status DistinctStreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<DistinctStreamingAggSinkLocalState>::init(tnode, state));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ Status StreamingAggSinkLocalState::_pre_agg_with_serialized_key(
StreamingAggSinkOperatorX::StreamingAggSinkOperatorX(ObjectPool* pool, int operator_id,
const TPlanNode& tnode,
const DescriptorTbl& descs)
: AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id, tnode, descs) {}
: AggSinkOperatorX<StreamingAggSinkLocalState>(pool, operator_id, tnode, descs, true) {}

Status StreamingAggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(AggSinkOperatorX<StreamingAggSinkLocalState>::init(tnode, state));
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/union_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ Status UnionSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info)
SCOPED_TIMER(_open_timer);
auto& p = _parent->cast<Parent>();
_child_expr.resize(p._child_expr.size());
_shared_state->data_queue.set_sink_dependency(_dependency, p._cur_child_id);
for (size_t i = 0; i < p._child_expr.size(); i++) {
RETURN_IF_ERROR(p._child_expr[i]->clone(state, _child_expr[i]));
}
Expand Down
5 changes: 4 additions & 1 deletion be/src/pipeline/exec/union_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
((UnionSourceDependency*)deps.front().get())->set_shared_state(ss);
}
RETURN_IF_ERROR(Base::init(state, info));
ss->data_queue.set_dependency(_dependency, info.upstream_dependencies.front().get());
ss->data_queue.set_source_dependency(_dependency);
SCOPED_TIMER(exec_time_counter());
SCOPED_TIMER(_open_timer);
// Const exprs materialized by this node. These exprs don't refer to any children.
Expand All @@ -141,6 +141,9 @@ Status UnionSourceLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(clone_expr_list(_const_expr_list, other_expr_list));
}
}
if (child_count == 0) {
_dependency->set_ready();
}
return Status::OK();
}

Expand Down
13 changes: 0 additions & 13 deletions be/src/pipeline/exec/union_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,6 @@ class UnionSourceDependency final : public Dependency {
UnionSourceDependency(int id, int node_id, QueryContext* query_ctx)
: Dependency(id, node_id, "UnionSourceDependency", query_ctx) {}
~UnionSourceDependency() override = default;

[[nodiscard]] Dependency* is_blocked_by(PipelineXTask* task) override {
if (((UnionSharedState*)_shared_state.get())->child_count() == 0) {
return nullptr;
}
if (((UnionSharedState*)_shared_state.get())->data_queue.is_all_finish() ||
((UnionSharedState*)_shared_state.get())->data_queue.remaining_has_data()) {
return nullptr;
}
return this;
}
bool push_to_blocking_queue() const override { return true; }
void block() override {}
};

class UnionSourceOperatorX;
Expand Down
Loading