Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Incremental cooperative rebalancing #283

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions include/cppkafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,20 @@ class CPPKAFKA_API Consumer : public KafkaHandleBase {
*/
void unassign();

/**
* \brief Sets the current topic/partition incremental assignment
*
* This translates into a call to rd_kafka_incremental_assign
*/
void incremental_assign(const TopicPartitionList& topic_partitions);

/**
* \brief Unassigns the current topic/partition incremental assignment
*
* This translates into a call to rd_kafka_incremental_unassign.
*/
void incremental_unassign(const TopicPartitionList& topic_partitions);

/**
* \brief Pauses all consumption
*/
Expand Down
4 changes: 4 additions & 0 deletions include/cppkafka/kafka_handle_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,9 @@
#include <mutex>
#include <tuple>
#include <chrono>
#include <librdkafka/rdtypes.h>
#include <librdkafka/rdkafka.h>
#include <librdkafka/rdkafka_error.h>
#include "group_information.h"
#include "topic_partition.h"
#include "topic_partition_list.h"
Expand Down Expand Up @@ -365,6 +367,8 @@ class CPPKAFKA_API KafkaHandleBase {
void check_error(rd_kafka_resp_err_t error) const;
void check_error(rd_kafka_resp_err_t error,
const rd_kafka_topic_partition_list_t* list_ptr) const;
void check_error(rd_kafka_error_t* error,
const rd_kafka_topic_partition_list_t* list_ptr) const;
rd_kafka_conf_t* get_configuration_handle();
private:
static const std::chrono::milliseconds DEFAULT_TIMEOUT;
Expand Down
28 changes: 26 additions & 2 deletions src/consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include <sstream>
#include <algorithm>
#include <cctype>
#include <cstring>
#include "macros.h"
#include "consumer.h"
#include "exceptions.h"
Expand Down Expand Up @@ -134,6 +135,21 @@ void Consumer::unassign() {
check_error(error);
}

void Consumer::incremental_assign(const TopicPartitionList& topic_partitions) {
if (topic_partitions.empty()) {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this right? The documentation says:

The application must pass the partition list passed to the callback (or a copy of it), even if the list is empty.

return;
}
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_error_t* error = rd_kafka_incremental_assign(get_handle(), topic_list_handle.get());
check_error(error, topic_list_handle.get());
}

void Consumer::incremental_unassign(const TopicPartitionList& topic_partitions) {
TopicPartitionsListPtr topic_list_handle = convert(topic_partitions);
rd_kafka_error_t* error = rd_kafka_incremental_unassign(get_handle(), topic_list_handle.get());
check_error(error, topic_list_handle.get());
}

void Consumer::pause() {
pause_partitions(get_assignment());
}
Expand Down Expand Up @@ -317,11 +333,19 @@ void Consumer::handle_rebalance(rd_kafka_resp_err_t error,
TopicPartitionList& topic_partitions) {
if (error == RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS) {
CallbackInvoker<AssignmentCallback>("assignment", assignment_callback_, this)(topic_partitions);
assign(topic_partitions);
if (!strcmp(rd_kafka_rebalance_protocol(get_handle()), "COOPERATIVE")) {
incremental_assign(topic_partitions);
} else {
assign(topic_partitions);
}
}
else if (error == RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS) {
CallbackInvoker<RevocationCallback>("revocation", revocation_callback_, this)(topic_partitions);
unassign();
if (!strcmp(rd_kafka_rebalance_protocol(get_handle()), "COOPERATIVE")) {
incremental_unassign(topic_partitions);
} else {
unassign();
}
}
else {
CallbackInvoker<RebalanceErrorCallback>("rebalance error", rebalance_error_callback_, this)(error);
Expand Down
10 changes: 10 additions & 0 deletions src/kafka_handle_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,16 @@ void KafkaHandleBase::check_error(rd_kafka_resp_err_t error,
}
}

void KafkaHandleBase::check_error(rd_kafka_error_t* error,
const rd_kafka_topic_partition_list_t* list_ptr) const {
if (!error) {
return;
}
rd_kafka_resp_err_t error_code = error->code;
rd_kafka_error_destroy(error);
check_error(error_code, list_ptr);
}

rd_kafka_conf_t* KafkaHandleBase::get_configuration_handle() {
return config_.get_handle();
}
Expand Down