Skip to content

Commit

Permalink
add method to check rebalance protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
olegrok committed Jun 13, 2024
1 parent 97bd101 commit 70e3157
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 3 deletions.
14 changes: 14 additions & 0 deletions kafka/consumer.c
Original file line number Diff line number Diff line change
Expand Up @@ -821,3 +821,17 @@ int
lua_consumer_resume(struct lua_State *L) {
return lua_consumer_call_pause_resume(L, kafka_resume);
}

int
lua_consumer_rebalance_protocol(struct lua_State *L) {
consumer_t **consumer_p = luaL_checkudata(L, 1, consumer_label);
if (consumer_p == NULL || *consumer_p == NULL)
return 0;

if ((*consumer_p)->rd_consumer != NULL) {
const char *proto = rd_kafka_rebalance_protocol((*consumer_p)->rd_consumer);
lua_pushstring(L, proto);
return 1;
}
return 0;
}
3 changes: 3 additions & 0 deletions kafka/consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,7 @@ lua_consumer_pause(struct lua_State *L);
int
lua_consumer_resume(struct lua_State *L);

int
lua_consumer_rebalance_protocol(struct lua_State *L);

#endif //TNT_KAFKA_CONSUMER_H
4 changes: 4 additions & 0 deletions kafka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ function Consumer:resume()
return self._consumer:resume()
end

function Consumer:rebalance_protocol()
return self._consumer:rebalance_protocol()
end

function Consumer:seek_partitions(topic_partitions_list, options)
local timeout_ms = get_timeout_from_options(options)
return self._consumer:seek_partitions(topic_partitions_list, timeout_ms)
Expand Down
2 changes: 1 addition & 1 deletion kafka/producer.c
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@ lua_producer_metadata(struct lua_State *L) {

int
lua_producer_list_groups(struct lua_State *L) {
producer_t **producer_p = (producer_t **)luaL_checkudata(L, 1, producer_label);
producer_t **producer_p = luaL_checkudata(L, 1, producer_label);
if (producer_p == NULL || *producer_p == NULL)
return 0;

Expand Down
1 change: 1 addition & 0 deletions kafka/tnt_kafka.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ luaopen_kafka_tntkafka(lua_State *L) {
{"resume", lua_consumer_resume},
{"close", lua_consumer_close},
{"destroy", lua_consumer_destroy},
{"rebalance_protocol", lua_consumer_rebalance_protocol},
{"__tostring", lua_consumer_tostring},
{NULL, NULL}
};
Expand Down
10 changes: 9 additions & 1 deletion tests/consumer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ local function list_groups(timeout_ms)
log.info("Groups: %s", json.encode(res))
-- Some fields can have binary data that won't
-- be correctly processed by connector.
for _, group in ipairs(res) do
for _, group in ipairs(res) do
group['members'] = nil
end
return res
Expand Down Expand Up @@ -204,6 +204,9 @@ local function test_seek_partitions()

for _ = 1, 5 do
local msg = out:get(3)
if msg == nil then
error('Message is not delivered')
end
log.info('Get message: %s', json.encode(msg_totable(msg)))
append_message(messages, msg)
consumer:seek_partitions({
Expand All @@ -214,6 +217,10 @@ local function test_seek_partitions()
return messages
end

local function rebalance_protocol()
return consumer:rebalance_protocol()
end

local function test_create_errors()
log.info('Create without config')
local _, err = tnt_kafka.Consumer.create()
Expand Down Expand Up @@ -263,6 +270,7 @@ return {
list_groups = list_groups,
pause = pause,
resume = resume,
rebalance_protocol = rebalance_protocol,

test_seek_partitions = test_seek_partitions,
test_create_errors = test_create_errors,
Expand Down
2 changes: 1 addition & 1 deletion tests/producer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ local function list_groups(timeout_ms)
log.info("Groups: %s", json.encode(res))
-- Some fields can have binary data that won't
-- be correctly processed by connector.
for _, group in ipairs(res) do
for _, group in ipairs(res) do
group['members'] = nil
end
return res
Expand Down
13 changes: 13 additions & 0 deletions tests/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,19 @@ def test_consumer_should_log_rebalances():
assert len(response.data[0]) > 0


def test_consumer_rebalance_protocol():
server = get_server()

with create_consumer(server, KAFKA_HOST, {"bootstrap.servers": KAFKA_HOST}):
time.sleep(5)
response = server.call("consumer.rebalance_protocol", [])
assert response[0] == 'NONE'

server.call("consumer.subscribe", [["test_unsub_partially_1"]])
response = server.call("consumer.rebalance_protocol", [])
assert response[0] == 'NONE'


def test_consumer_should_continue_consuming_from_last_committed_offset():
message1 = {
"key": "test1",
Expand Down
1 change: 1 addition & 0 deletions tests/test_producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def test_producer_should_log_debug():

server.call("producer.close", [])


def test_producer_create_errors():
server = get_server()
server.call("producer.test_create_errors")

0 comments on commit 70e3157

Please sign in to comment.