diff --git a/kafka/consumer.c b/kafka/consumer.c index b23de6b..9dc0487 100644 --- a/kafka/consumer.c +++ b/kafka/consumer.c @@ -821,3 +821,17 @@ int lua_consumer_resume(struct lua_State *L) { return lua_consumer_call_pause_resume(L, kafka_resume); } + +int +lua_consumer_rebalance_protocol(struct lua_State *L) { + consumer_t **consumer_p = luaL_checkudata(L, 1, consumer_label); + if (consumer_p == NULL || *consumer_p == NULL) + return 0; + + if ((*consumer_p)->rd_consumer != NULL) { + const char *proto = rd_kafka_rebalance_protocol((*consumer_p)->rd_consumer); + lua_pushstring(L, proto); + return 1; + } + return 0; +} diff --git a/kafka/consumer.h b/kafka/consumer.h index 1715bd4..34b66d0 100644 --- a/kafka/consumer.h +++ b/kafka/consumer.h @@ -59,4 +59,7 @@ lua_consumer_pause(struct lua_State *L); int lua_consumer_resume(struct lua_State *L); +int +lua_consumer_rebalance_protocol(struct lua_State *L); + #endif //TNT_KAFKA_CONSUMER_H diff --git a/kafka/init.lua b/kafka/init.lua index 0e3b32e..97dab59 100644 --- a/kafka/init.lua +++ b/kafka/init.lua @@ -219,6 +219,10 @@ function Consumer:resume() return self._consumer:resume() end +function Consumer:rebalance_protocol() + return self._consumer:rebalance_protocol() +end + function Consumer:seek_partitions(topic_partitions_list, options) local timeout_ms = get_timeout_from_options(options) return self._consumer:seek_partitions(topic_partitions_list, timeout_ms) diff --git a/kafka/producer.c b/kafka/producer.c index 030e3ae..626f6a4 100644 --- a/kafka/producer.c +++ b/kafka/producer.c @@ -631,7 +631,7 @@ lua_producer_metadata(struct lua_State *L) { int lua_producer_list_groups(struct lua_State *L) { - producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label); + producer_t **producer_p = luaL_checkudata(L, 1, producer_label); if (producer_p == NULL || *producer_p == NULL) return 0; diff --git a/kafka/tnt_kafka.c b/kafka/tnt_kafka.c index 06266fe..9055dea 100644 --- a/kafka/tnt_kafka.c +++ b/kafka/tnt_kafka.c @@ -33,6 +33,7 @@ luaopen_kafka_tntkafka(lua_State *L) { {"resume", lua_consumer_resume}, {"close", lua_consumer_close}, {"destroy", lua_consumer_destroy}, + {"rebalance_protocol", lua_consumer_rebalance_protocol}, {"__tostring", lua_consumer_tostring}, {NULL, NULL} }; diff --git a/tests/consumer.lua b/tests/consumer.lua index 1e91a0e..89f77b0 100644 --- a/tests/consumer.lua +++ b/tests/consumer.lua @@ -172,7 +172,7 @@ local function list_groups(timeout_ms) log.info("Groups: %s", json.encode(res)) -- Some fields can have binary data that won't -- be correctly processed by connector. - for _, group in ipairs(res) do + for _, group in ipairs(res) do group['members'] = nil end return res @@ -204,6 +204,9 @@ local function test_seek_partitions() for _ = 1, 5 do local msg = out:get(3) + if msg == nil then + error('Message is not delivered') + end log.info('Get message: %s', json.encode(msg_totable(msg))) append_message(messages, msg) consumer:seek_partitions({ @@ -214,6 +217,10 @@ local function test_seek_partitions() return messages end +local function rebalance_protocol() + return consumer:rebalance_protocol() +end + local function test_create_errors() log.info('Create without config') local _, err = tnt_kafka.Consumer.create() @@ -263,6 +270,7 @@ return { list_groups = list_groups, pause = pause, resume = resume, + rebalance_protocol = rebalance_protocol, test_seek_partitions = test_seek_partitions, test_create_errors = test_create_errors, diff --git a/tests/producer.lua b/tests/producer.lua index 7777dba..2959d67 100644 --- a/tests/producer.lua +++ b/tests/producer.lua @@ -97,7 +97,7 @@ local function list_groups(timeout_ms) log.info("Groups: %s", json.encode(res)) -- Some fields can have binary data that won't -- be correctly processed by connector. - for _, group in ipairs(res) do + for _, group in ipairs(res) do group['members'] = nil end return res diff --git a/tests/test_consumer.py b/tests/test_consumer.py index 977db49..fac8943 100644 --- a/tests/test_consumer.py +++ b/tests/test_consumer.py @@ -379,6 +379,19 @@ def test_consumer_should_log_rebalances(): assert len(response.data[0]) > 0 +def test_consumer_rebalance_protocol(): + server = get_server() + + with create_consumer(server, KAFKA_HOST, {"bootstrap.servers": KAFKA_HOST}): + time.sleep(5) + response = server.call("consumer.rebalance_protocol", []) + assert response[0] == 'NONE' + + server.call("consumer.subscribe", [["test_unsub_partially_1"]]) + response = server.call("consumer.rebalance_protocol", []) + assert response[0] == 'NONE' + + def test_consumer_should_continue_consuming_from_last_committed_offset(): message1 = { "key": "test1", diff --git a/tests/test_producer.py b/tests/test_producer.py index a70b121..765abe1 100644 --- a/tests/test_producer.py +++ b/tests/test_producer.py @@ -177,6 +177,7 @@ def test_producer_should_log_debug(): server.call("producer.close", []) + def test_producer_create_errors(): server = get_server() server.call("producer.test_create_errors")