-
Notifications
You must be signed in to change notification settings - Fork 3.2k
Consuming from multiple topics partitions from a single thread
NOTE: This information relates to the legacy simple consumer. The new high-level KafkaConsumer automatically provides a single queue polling point for all assigned partitions
The standard consumer interface serves messages from one topic+partition (toppar) with one call
to rd_kafka_consume*()
.
In the case an application needs to consume from multiple toppars one method is to create one application
thread per toppar. This is sometimes the proper approach, such as when messages from different topics
are handled by different parts of the application code, but when consuming from multiple partitions of the same
topic multiple application threads complicate matters and requires the application to perform the
usual lock-and-dispatch queue between threads.
To alleviate this librdkafka provides autonomous queues which can be used to re-route messages from multiple toppars to a single queue, thus providing a single queue to serve from a single application thread with a single call.
First create a queue which toppars (topic+partition) messages will be re-routed to:
rd_kafka_queue_t *rkqu = rd_kafka_queue_new(rk);
Create the consumer in the usual way but start consuming by using the queue interface version of .._consume_start()
:
rd_kafka_consume_start_queue(rkt, partition, RD_KAFKA_OFFSET_STORED, rkqu);
Repeat this step for each topic+partition you need to consume from by reusing the queue rkqu
in each step.
NOTE: Queues from one rd_kafka_t may not be used with another rd_kafka_t handle.
Then from your consumer loop simply call one of the ..consume.._queue()
functions to consume messages
from all topics+partitions+ re-routed to this queue.
while (run) {
rd_kafka_message_t *rkmessage;
/* Consume message from queue */
rkmessage = rd_kafka_consume_queue(rkqu, 1000);
/* Handle message */
...
rd_kafka_message_destroy(rkmessage);
}
To stop consuming a topic+partition use the standard rd_kafka_consume_stop()
interface.
rd_kafka_consume_stop(rkt, partition);
When all consumers using a queue have been stopped the queue itself may be destroyed.
rd_kafka_queue_destroy(rkqu)
Message ordering is guaranteed within a topic+partition that is re-routed to the queue, but there is no definition of ordering between topic+partitions.
See https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka.h for interface documentation.