Skip to content

Commit e29025e

Browse files
authored
[Chore](exchange) add some check about eos with exhcnage source (#57357)
This pull request introduces improvements to the handling of data limits and error detection in the data exchange and streaming components. The main changes ensure correct behavior when limits are unset and add better error reporting for unexpected sender states. **Improvements to limit handling:** * Updated the condition in `ExchangeSourceOperatorX::get_block` (in `exchange_source_operator.cpp`) to correctly handle cases when `_limit` is unset (`-1`), ensuring that the row limit logic only applies when a limit is set. **Robustness and error detection:** * Enhanced `VDataStreamRecvr::SenderQueue::get_batch` (in `vdata_stream_recvr.cpp`) to return an internal error status if the data queue is empty but there are still remaining senders, improving error detection and debugging.
1 parent cbd5602 commit e29025e

File tree

2 files changed

+8
-4
lines changed

2 files changed

+8
-4
lines changed

be/src/pipeline/exec/exchange_source_operator.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,7 +186,7 @@ Status ExchangeSourceOperatorX::get_block(RuntimeState* state, vectorized::Block
186186
}
187187
}
188188
// Merge actually also handles the limit, but handling the limit one more time will not cause correctness issues
189-
if (local_state.num_rows_returned() + block->rows() < _limit) {
189+
if (_limit == -1 || local_state.num_rows_returned() + block->rows() < _limit) {
190190
local_state.add_num_rows_returned(block->rows());
191191
} else {
192192
*eos = true;

be/src/vec/runtime/vdata_stream_recvr.cpp

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,12 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block* block, bool* eos) {
8888
}
8989

9090
if (_block_queue.empty()) {
91-
DCHECK_EQ(_num_remaining_senders, 0);
91+
if (_num_remaining_senders != 0) {
92+
return Status::InternalError(
93+
"Data queue is empty but there are still remaining senders. "
94+
"_num_remaining_senders: {}",
95+
_num_remaining_senders);
96+
}
9297
*eos = true;
9398
return Status::OK();
9499
}
@@ -171,10 +176,9 @@ Status VDataStreamRecvr::SenderQueue::add_block(std::unique_ptr<PBlock> pblock,
171176
auto iter = _packet_seq_map.find(be_number);
172177
if (iter != _packet_seq_map.end()) {
173178
if (iter->second >= packet_seq) {
174-
LOG(WARNING) << fmt::format(
179+
return Status::InternalError(
175180
"packet already exist [cur_packet_id= {} receive_packet_id={}]",
176181
iter->second, packet_seq);
177-
return Status::OK();
178182
}
179183
iter->second = packet_seq;
180184
} else {

0 commit comments

Comments
 (0)