diff --git a/include/cppkafka/consumer.h b/include/cppkafka/consumer.h index 2ac65694..d76256ec 100644 --- a/include/cppkafka/consumer.h +++ b/include/cppkafka/consumer.h @@ -196,6 +196,20 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase { */ void unassign(); + /** + * \brief Sets the current topic/partition incremental assignment + * + * This translates into a call to rd_kafka_incremental_assign + */ + void incremental_assign(const TopicPartitionList& topic_partitions); + + /** + * \brief Unassigns the current topic/partition incremental assignment + * + * This translates into a call to rd_kafka_incremental_unassign. + */ + void incremental_unassign(const TopicPartitionList& topic_partitions); + /** * \brief Pauses all consumption */ diff --git a/include/cppkafka/kafka_handle_base.h b/include/cppkafka/kafka_handle_base.h index e6095f98..5ba5f10b 100644 --- a/include/cppkafka/kafka_handle_base.h +++ b/include/cppkafka/kafka_handle_base.h @@ -38,7 +38,9 @@ #include #include #include +#include #include +#include #include "group_information.h" #include "topic_partition.h" #include "topic_partition_list.h" @@ -365,6 +367,8 @@ class CPPKAFKA_API KafkaHandleBase { void check_error(rd_kafka_resp_err_t error) const; void check_error(rd_kafka_resp_err_t error, const rd_kafka_topic_partition_list_t* list_ptr) const; + void check_error(rd_kafka_error_t* error, + const rd_kafka_topic_partition_list_t* list_ptr) const; rd_kafka_conf_t* get_configuration_handle(); private: static const std::chrono::milliseconds DEFAULT_TIMEOUT; diff --git a/src/consumer.cpp b/src/consumer.cpp index 38f156ad..3f6a27c3 100644 --- a/src/consumer.cpp +++ b/src/consumer.cpp @@ -29,6 +29,7 @@ #include #include #include +#include #include "macros.h" #include "consumer.h" #include "exceptions.h" @@ -134,6 +135,21 @@ void Consumer::unassign() { check_error(error); } +void Consumer::incremental_assign(const TopicPartitionList& topic_partitions) { + if (topic_partitions.empty()) { + return; + } + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_error_t* error = rd_kafka_incremental_assign(get_handle(), topic_list_handle.get()); + check_error(error, topic_list_handle.get()); +} + +void Consumer::incremental_unassign(const TopicPartitionList& topic_partitions) { + TopicPartitionsListPtr topic_list_handle = convert(topic_partitions); + rd_kafka_error_t* error = rd_kafka_incremental_unassign(get_handle(), topic_list_handle.get()); + check_error(error, topic_list_handle.get()); +} + void Consumer::pause() { pause_partitions(get_assignment()); } @@ -317,11 +333,19 @@ void Consumer::handle_rebalance(rd_kafka_resp_err_t error, TopicPartitionList& topic_partitions) { if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) { CallbackInvoker("assignment", assignment_callback_, this)(topic_partitions); - assign(topic_partitions); + if (!strcmp(rd_kafka_rebalance_protocol(get_handle()), "COOPERATIVE")) { + incremental_assign(topic_partitions); + } else { + assign(topic_partitions); + } } else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) { CallbackInvoker("revocation", revocation_callback_, this)(topic_partitions); - unassign(); + if (!strcmp(rd_kafka_rebalance_protocol(get_handle()), "COOPERATIVE")) { + incremental_unassign(topic_partitions); + } else { + unassign(); + } } else { CallbackInvoker("rebalance error", rebalance_error_callback_, this)(error); diff --git a/src/kafka_handle_base.cpp b/src/kafka_handle_base.cpp index 3d1dc7fa..ac0ab2c6 100644 --- a/src/kafka_handle_base.cpp +++ b/src/kafka_handle_base.cpp @@ -281,6 +281,16 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error, } } +void KafkaHandleBase::check_error(rd_kafka_error_t* error, + const rd_kafka_topic_partition_list_t* list_ptr) const { + if (!error) { + return; + } + rd_kafka_resp_err_t error_code = error->code; + rd_kafka_error_destroy(error); + check_error(error_code, list_ptr); +} + rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() { return config_.get_handle(); }