diff --git a/be/src/vec/common/sort/sorter.cpp b/be/src/vec/common/sort/sorter.cpp index 72bf35f3cbac3b..4f7de1d379aea9 100644 --- a/be/src/vec/common/sort/sorter.cpp +++ b/be/src/vec/common/sort/sorter.cpp @@ -108,10 +108,6 @@ Status MergeSorterState::merge_sort_read(doris::vectorized::Block* block, int ba Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized::Block* block, bool* eos) { - if (priority_queue_.empty()) { - *eos = true; - return Status::OK(); - } size_t num_columns = priority_queue_.top().impl->block->columns(); MutableBlock m_block = VectorizedUtils::build_mutable_mem_reuse_block( @@ -120,13 +116,15 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized /// Take rows from queue in right order and push to 'merged'. size_t merged_rows = 0; - while (!priority_queue_.empty()) { + // process single element queue on merge_sort_read() + while (priority_queue_.size() > 1 && merged_rows < batch_size) { auto current = priority_queue_.top(); priority_queue_.pop(); if (offset_ == 0) { - for (size_t i = 0; i < num_columns; ++i) + for (size_t i = 0; i < num_columns; ++i) { merged_columns[i]->insert_from(*current->block->get_columns()[i], current->pos); + } ++merged_rows; } else { offset_--; @@ -136,18 +134,9 @@ Status MergeSorterState::_merge_sort_read_impl(int batch_size, doris::vectorized current->next(); priority_queue_.push(current); } - - if (merged_rows == batch_size) { - break; - } - } - block->set_columns(std::move(merged_columns)); - - if (merged_rows == 0) { - *eos = true; - return Status::OK(); } + block->set_columns(std::move(merged_columns)); return Status::OK(); } @@ -217,9 +206,15 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offs Status FullSorter::append_block(Block* block) { DCHECK(block->rows() > 0); + + if (_reach_limit() && block->bytes() > _state->unsorted_block_->allocated_bytes() - + _state->unsorted_block_->bytes()) { + RETURN_IF_ERROR(_do_sort()); + } + { SCOPED_TIMER(_merge_block_timer); - auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); + const auto& data = _state->unsorted_block_->get_columns_with_type_and_name(); const auto& arrival_data = block->get_columns_with_type_and_name(); auto sz = block->rows(); for (int i = 0; i < data.size(); ++i) { @@ -232,9 +227,6 @@ Status FullSorter::append_block(Block* block) { } block->clear_column_data(); } - if (_reach_limit()) { - RETURN_IF_ERROR(_do_sort()); - } return Status::OK(); } diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h index aa7d88dfbc2a3a..36c535c9101db9 100644 --- a/be/src/vec/common/sort/sorter.h +++ b/be/src/vec/common/sort/sorter.h @@ -177,21 +177,15 @@ class FullSorter final : public Sorter { private: bool _reach_limit() { - return _state->unsorted_block_->rows() > buffered_block_size_ || - _state->unsorted_block_->bytes() > buffered_block_bytes_; + return _state->unsorted_block_->allocated_bytes() >= buffered_block_bytes_; } Status _do_sort(); std::unique_ptr _state; - static constexpr size_t INITIAL_BUFFERED_BLOCK_SIZE = 1024 * 1024; - static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 << 20; + static constexpr size_t INITIAL_BUFFERED_BLOCK_BYTES = 64 * 1024 * 1024; - static constexpr size_t SPILL_BUFFERED_BLOCK_SIZE = 4 * 1024 * 1024; - static constexpr size_t SPILL_BUFFERED_BLOCK_BYTES = 256 << 20; - - size_t buffered_block_size_ = INITIAL_BUFFERED_BLOCK_SIZE; size_t buffered_block_bytes_ = INITIAL_BUFFERED_BLOCK_BYTES; };