diff --git a/src/workers.cc b/src/workers.cc index 55d3dd50..5611f243 100644 --- a/src/workers.cc +++ b/src/workers.cc @@ -7,6 +7,7 @@ * of the MIT license. See the LICENSE.txt file for details. */ +#include #include #include @@ -808,10 +809,26 @@ KafkaConsumerConsumeNum::~KafkaConsumerConsumeNum() {} void KafkaConsumerConsumeNum::Execute() { std::size_t max = static_cast(m_num_messages); bool looping = true; - int timeout_ms = m_timeout_ms; std::size_t eof_event_count = 0; + auto start_time = std::chrono::steady_clock::now(); + int timeout_ms = m_timeout_ms; + int early_exit_ms = 1; + while (m_messages.size() - eof_event_count < max && looping) { + // Allow timeout_ms = early_exit_ms to take precedence + // timeout_ms > 1 + if (timeout_ms > early_exit_ms) { + // Calc next single consume timeout remaining for batch + auto now = std::chrono::steady_clock::now(); + auto elapsed = + std::chrono::duration_cast(now - start_time) + .count(); + // `timeout_ms` of 0 triggers non-blocking behavior https://github.com/confluentinc/librdkafka/blob/3f52de491f8aae1d71a9a0b3f1c07bfd6df4aec3/src/rdkafka_int.h#L1189-L1190 + // This still returns ERR_TIMED_OUT if no message available + timeout_ms = std::max(0, m_timeout_ms - static_cast(elapsed)); + } + // Get a message Baton b = m_consumer->Consume(timeout_ms); if (b.err() == RdKafka::ERR_NO_ERROR) { @@ -822,7 +839,7 @@ void KafkaConsumerConsumeNum::Execute() { // If partition EOF and have consumed messages, retry with timeout 1 // This allows getting ready messages, while not waiting for new ones if (m_messages.size() > eof_event_count) { - timeout_ms = 1; + timeout_ms = early_exit_ms; } // We will only go into this code path when `enable.partition.eof` is set to true