Skip to content

[KIP-848]: Add testing changes and describe consumer group changes for KIP-848 #329

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 24 commits into from
Jul 1, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions .semaphore/semaphore.yml
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,21 @@ blocks:
commands:
- make test
- artifact push workflow coverage/mocha/coverage-final.json --destination "mocha-coverage.json"
- name: "Promisified Tests"
- name: "Promisified Tests (Classic Protocol)"
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
- docker compose up -d && sleep 30
- docker compose -f test/docker/docker-compose.yml up -d && sleep 30
- export NODE_OPTIONS='--max-old-space-size=1536'
- npx jest --no-colors --ci test/promisified/
- artifact push workflow coverage/jest/coverage-final.json --destination "jest-coverage.json"
- artifact push workflow coverage/jest/coverage-final.json --destination "jest-classic-coverage.json"
- name: "Promisified Tests (Consumer Protocol)"
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
- docker compose -f test/docker/docker-compose-kraft.yml up -d && sleep 30
- export TEST_CONSUMER_GROUP_PROTOCOL=consumer
- export NODE_OPTIONS='--max-old-space-size=1536'
- npx jest --no-colors --ci test/promisified/
- artifact push workflow coverage/jest/coverage-final.json --destination "jest-consumer-coverage.json"
- name: "Lint"
commands:
- make lint
Expand Down Expand Up @@ -163,10 +171,10 @@ blocks:
- export BUILD_LIBRDKAFKA=0
- npm run install-from-source
jobs:
- name: "Performance Test"
- name: "Performance Test (Classic Protocol)"
commands:
- '[[ -z $DOCKERHUB_APIKEY ]] || docker login --username $DOCKERHUB_USER --password $DOCKERHUB_APIKEY'
- docker compose up -d && sleep 30
- docker compose -f test/docker/docker-compose.yml up -d && sleep 30
- export NODE_OPTIONS='--max-old-space-size=1536'
- cd examples/performance
- npm install
Expand Down Expand Up @@ -479,7 +487,8 @@ after_pipeline:
- checkout
- sem-version java 11
- artifact pull workflow mocha-coverage.json
- artifact pull workflow jest-coverage.json
- artifact pull workflow jest-classic-coverage.json
- artifact pull workflow jest-consumer-coverage.json
- artifact pull workflow jest-sr-coverage.json
- npx --yes istanbul-merge --out merged-output/merged-coverage.json *-coverage.json
- npx nyc report -t merged-output --report-dir coverage --reporter=text --reporter=lcov
Expand Down
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,12 @@
# confluent-kafka-javascript v1.4.0

v1.4.0 is a feature release. It is supported for all usage.

## Enhancements

1. References librdkafka v2.11.0. Refer to the [librdkafka v2.11.0 release notes](https://github.com/confluentinc/librdkafka/releases/tag/v2.11.0) for more information.
2. [KIP-848] `describeGroups()` now supports KIP-848 introduced `consumer` groups. Two new fields for consumer group type and target assignment have also been added. Type defines whether this group is a `classic` or `consumer` group. Target assignment is only valid for the `consumer` protocol and its defaults to being undefined.

# confluent-kafka-javascript v1.3.2

v1.3.2 is a maintenance release. It is supported for all usage.
Expand Down
3 changes: 2 additions & 1 deletion examples/kafkajs/admin/describe-groups.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
// require('kafkajs') is replaced with require('@confluentinc/kafka-javascript').KafkaJS.
const { Kafka, ConsumerGroupStates } = require('@confluentinc/kafka-javascript').KafkaJS;
const { Kafka } = require('@confluentinc/kafka-javascript').KafkaJS;
const { parseArgs } = require('node:util');

function printNode(node, prefix = '') {
Expand Down Expand Up @@ -72,6 +72,7 @@ async function adminStart() {
console.log(`\tProtocol type: ${group.protocolType}`);
console.log(`\tPartition assignor: ${group.partitionAssignor}`);
console.log(`\tState: ${group.state}`);
console.log(`\tType: ${group.type}`);
console.log(`\tCoordinator: ${group.coordinator ? group.coordinator.id : group.coordinator}`);
printNode(group.coordinator, '\t');
console.log(`\tAuthorized operations: ${group.authorizedOperations}`);
Expand Down
7 changes: 7 additions & 0 deletions lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,12 @@ const ConsumerGroupStates = {
EMPTY: 5,
};

const ConsumerGroupTypes = {
UNKNOWN: 0,
CONSUMER: 1,
CLASSIC: 2,
};

/**
* A list of ACL operation types.
* @enum {number}
Expand Down Expand Up @@ -95,6 +101,7 @@ module.exports = {
create: createAdminClient,
createFrom: createAdminClientFrom,
ConsumerGroupStates: Object.freeze(ConsumerGroupStates),
ConsumerGroupTypes: Object.freeze(ConsumerGroupTypes),
AclOperationTypes: Object.freeze(AclOperationTypes),
IsolationLevel: Object.freeze(IsolationLevel),
OffsetSpec,
Expand Down
8 changes: 8 additions & 0 deletions lib/kafkajs/_admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -1009,6 +1009,14 @@ module.exports = {
* @see RdKafka.ConsumerGroupStates
*/
ConsumerGroupStates: RdKafka.AdminClient.ConsumerGroupStates,
/**
* A list of consumer group types.
* @enum {number}
* @readonly
* @memberof KafkaJS
* @see RdKafka.ConsumerGroupTypes
*/
ConsumerGroupTypes: RdKafka.AdminClient.ConsumerGroupTypes,
/**
* A list of ACL operation types.
* @enum {number}
Expand Down
39 changes: 29 additions & 10 deletions lib/kafkajs/_consumer.js
Original file line number Diff line number Diff line change
Expand Up @@ -474,7 +474,7 @@
}
}

#kafkaJSToConsumerConfig(kjsConfig) {
#kafkaJSToConsumerConfig(kjsConfig, isClassicProtocol = true) {

Check failure on line 477 in lib/kafkajs/_consumer.js

View check run for this annotation

SonarQube-Confluent / confluent-kafka-javascript Sonarqube Results

lib/kafkajs/_consumer.js#L477

Refactor this function to reduce its Cognitive Complexity from 46 to the 15 allowed.
if (!kjsConfig || Object.keys(kjsConfig).length === 0) {
return {};
}
Expand All @@ -498,37 +498,53 @@
}

if (Object.hasOwn(kjsConfig, 'partitionAssignors')) {
if (!isClassicProtocol) {
throw new error.KafkaJSError(
"partitionAssignors is not supported when group.protocol is not 'classic'.",
{ code: error.ErrorCodes.ERR__INVALID_ARG }
);
}
if (!Array.isArray(kjsConfig.partitionAssignors)) {
throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG });
}

kjsConfig.partitionAssignors.forEach(assignor => {
if (typeof assignor !== 'string')
throw new error.KafkaJSError(CompatibilityErrorMessages.partitionAssignors(), { code: error.ErrorCodes.ERR__INVALID_ARG });
});

rdKafkaConfig['partition.assignment.strategy'] = kjsConfig.partitionAssignors.join(',');
} else {
} else if (isClassicProtocol) {
rdKafkaConfig['partition.assignment.strategy'] = PartitionAssigners.roundRobin;
}

if (Object.hasOwn(kjsConfig, 'sessionTimeout')) {
if (!isClassicProtocol) {
throw new error.KafkaJSError(
"sessionTimeout is not supported when group.protocol is not 'classic'.",
{ code: error.ErrorCodes.ERR__INVALID_ARG }
);
}
rdKafkaConfig['session.timeout.ms'] = kjsConfig.sessionTimeout;
} else {
} else if (isClassicProtocol) {
rdKafkaConfig['session.timeout.ms'] = 30000;
}

if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) {
if (!isClassicProtocol) {
throw new error.KafkaJSError(
"heartbeatInterval is not supported when group.protocol is not 'classic'.",
{ code: error.ErrorCodes.ERR__INVALID_ARG }
);
}
rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval;
}

if (Object.hasOwn(kjsConfig, 'rebalanceTimeout')) {
/* In librdkafka, we use the max poll interval as the rebalance timeout as well. */
rdKafkaConfig['max.poll.interval.ms'] = +kjsConfig.rebalanceTimeout;
} else if (!rdKafkaConfig['max.poll.interval.ms']) {
rdKafkaConfig['max.poll.interval.ms'] = 300000; /* librdkafka default */
}

if (Object.hasOwn(kjsConfig, 'heartbeatInterval')) {
rdKafkaConfig['heartbeat.interval.ms'] = kjsConfig.heartbeatInterval;
}

if (Object.hasOwn(kjsConfig, 'metadataMaxAge')) {
rdKafkaConfig['topic.metadata.refresh.interval.ms'] = kjsConfig.metadataMaxAge;
}
Expand Down Expand Up @@ -605,8 +621,11 @@
}

#finalizedConfig() {
const protocol = this.#userConfig['group.protocol'];
const isClassicProtocol = protocol === undefined ||
(typeof protocol === 'string' && protocol.toLowerCase() === 'classic');
/* Creates an rdkafka config based off the kafkaJS block. Switches to compatibility mode if the block exists. */
let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS);
let compatibleConfig = this.#kafkaJSToConsumerConfig(this.#userConfig.kafkaJS, isClassicProtocol);

/* There can be multiple different and conflicting config directives for setting the log level:
* 1. If there's a kafkaJS block:
Expand Down
3 changes: 2 additions & 1 deletion lib/kafkajs/_kafka.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
const { Producer, CompressionTypes } = require('./_producer');
const { Consumer, PartitionAssigners } = require('./_consumer');
const { Admin, ConsumerGroupStates, AclOperationTypes, IsolationLevel } = require('./_admin');
const { Admin, ConsumerGroupStates, ConsumerGroupTypes, AclOperationTypes, IsolationLevel } = require('./_admin');
const error = require('./_error');
const { logLevel, checkIfKafkaJsKeysPresent, CompatibilityErrorMessages } = require('./_common');

Expand Down Expand Up @@ -119,6 +119,7 @@ module.exports = {
PartitionAssigners,
PartitionAssignors: PartitionAssigners,
CompressionTypes,
ConsumerGroupTypes,
ConsumerGroupStates,
AclOperationTypes,
IsolationLevel};
1 change: 1 addition & 0 deletions lib/rdkafka.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,6 @@ module.exports = {
IsolationLevel: Admin.IsolationLevel,
OffsetSpec: Admin.OffsetSpec,
ConsumerGroupStates: Admin.ConsumerGroupStates,
ConsumerGroupTypes: Admin.ConsumerGroupTypes,
AclOperationTypes: Admin.AclOperationTypes,
};
24 changes: 24 additions & 0 deletions src/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -980,6 +980,9 @@ v8::Local<v8::Object> FromMemberDescription(
assignment: {
topicPartitions: TopicPartition[]
},
targetAssignment?: {
topicPartitions: TopicPartition[]
}
}
*/
v8::Local<v8::Object> returnObject = Nan::New<v8::Object>();
Expand Down Expand Up @@ -1028,6 +1031,23 @@ v8::Local<v8::Object> FromMemberDescription(
Nan::Set(returnObject, Nan::New("assignment").ToLocalChecked(),
assignmentObject);

// targetAssignment
const rd_kafka_MemberAssignment_t* target_assignment =
rd_kafka_MemberDescription_target_assignment(member);
if (target_assignment) {
const rd_kafka_topic_partition_list_t* target_partitions =
rd_kafka_MemberAssignment_partitions(target_assignment);
v8::Local<v8::Array> targetTopicPartitions =
Conversion::TopicPartition::ToTopicPartitionV8Array(
target_partitions, false);
v8::Local<v8::Object> targetAssignmentObject = Nan::New<v8::Object>();
Nan::Set(targetAssignmentObject,
Nan::New("topicPartitions").ToLocalChecked(),
targetTopicPartitions);
Nan::Set(returnObject, Nan::New("targetAssignment").ToLocalChecked(),
targetAssignmentObject);
}

return returnObject;
}

Expand Down Expand Up @@ -1105,6 +1125,10 @@ v8::Local<v8::Object> FromConsumerGroupDescription(
Nan::Set(returnObject, Nan::New("state").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupDescription_state(desc)));

// type
Nan::Set(returnObject, Nan::New("type").ToLocalChecked(),
Nan::New<v8::Number>(rd_kafka_ConsumerGroupDescription_type(desc)));

// coordinator
const rd_kafka_Node_t* coordinator =
rd_kafka_ConsumerGroupDescription_coordinator(desc);
Expand Down
14 changes: 14 additions & 0 deletions test/docker/docker-compose-kraft.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
services:
kafka:
image: apache/kafka:4.0.0
restart: unless-stopped
container_name: kafka
ports:
- 9092:29092
- 9093:29093
volumes:
- ./kafka_jaas.conf:/etc/kafka/kafka_jaas.conf
- ./kraft/server.properties:/mnt/shared/config/server.properties
environment:
KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/kafka_jaas.conf"

4 changes: 2 additions & 2 deletions docker-compose.yml → test/docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper
image: confluentinc/cp-zookeeper:7.9.2
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka
image: confluentinc/cp-kafka:7.9.2
restart: always
depends_on:
- zookeeper
Expand Down
13 changes: 13 additions & 0 deletions test/docker/kafka_jaas.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
KafkaServer {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin"
user_admin="admin"
user_testuser="testpass";
};

Client {
org.apache.kafka.common.security.plain.PlainLoginModule required
username="admin"
password="admin";
};
31 changes: 31 additions & 0 deletions test/docker/kraft/server.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
broker.id=0
port=9092
reserved.broker.max.id=65536
listeners=PLAINTEXT://:9092,CONTROLLER://:38705,SASL_PLAINTEXT://:9093,DOCKER://:29092,DOCKER_SASL_PLAINTEXT://:29093
advertised.listeners=PLAINTEXT://kafka:9092,SASL_PLAINTEXT://kafka:9093,DOCKER://localhost:9092,DOCKER_SASL_PLAINTEXT://localhost:9093
num.partitions=4
auto.create.topics.enable=true
delete.topic.enable=true
default.replication.factor=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
security.inter.broker.protocol=SASL_PLAINTEXT
sasl.mechanism.controller.protocol=PLAIN
sasl.mechanism.inter.broker.protocol=PLAIN
super.users=User:admin
allow.everyone.if.no.acl.found=true

broker.rack=RACK1
replica.selector.class=org.apache.kafka.common.replica.RackAwareReplicaSelector
group.coordinator.rebalance.protocols=classic,consumer
connections.max.reauth.ms=10000
log.retention.bytes=1000000000
process.roles=broker,controller
controller.listener.names=CONTROLLER
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,CONTROLLER:SASL_PLAINTEXT,DOCKER:PLAINTEXT,DOCKER_SASL_PLAINTEXT:SASL_PLAINTEXT
authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer
sasl.enabled.mechanisms=PLAIN
controller.quorum.voters=0@kafka:38705
group.consumer.min.session.timeout.ms =6000
group.consumer.session.timeout.ms = 10000
14 changes: 9 additions & 5 deletions test/promisified/admin/describe_groups.spec.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
jest.setTimeout(30000);

const {
testConsumerGroupProtocolClassic,
createConsumer,
createProducer,
secureRandom,
Expand All @@ -9,7 +10,7 @@ const {
createAdmin,
sleep,
} = require('../testhelpers');
const { ConsumerGroupStates, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS;
const { ConsumerGroupStates, ConsumerGroupTypes, ErrorCodes, AclOperationTypes } = require('../../../lib').KafkaJS;

describe('Admin > describeGroups', () => {
let topicName, groupId, consumer, admin, groupInstanceId, producer;
Expand Down Expand Up @@ -77,15 +78,17 @@ describe('Admin > describeGroups', () => {
await admin.connect();
let describeGroupsResult = await admin.describeGroups(
[groupId], { includeAuthorizedOperations: true });

expect(describeGroupsResult.groups.length).toEqual(1);
expect(describeGroupsResult.groups[0]).toEqual(
expect.objectContaining({
groupId,
protocol: 'roundrobin',
partitionAssignor: 'roundrobin',
protocol: testConsumerGroupProtocolClassic() ? 'roundrobin' : 'uniform',
partitionAssignor: testConsumerGroupProtocolClassic() ? 'roundrobin' : 'uniform',
isSimpleConsumerGroup: false,
protocolType: 'consumer',
state: ConsumerGroupStates.STABLE,
type: testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER,
coordinator: expect.objectContaining({
id: expect.any(Number),
host: expect.any(String),
Expand Down Expand Up @@ -134,9 +137,10 @@ describe('Admin > describeGroups', () => {
expect(describeGroupsResult.groups[0]).toEqual(
expect.objectContaining({
groupId,
protocol: '',
partitionAssignor: '',
protocol: testConsumerGroupProtocolClassic() ? '' : 'uniform',
partitionAssignor: testConsumerGroupProtocolClassic() ? '' : 'uniform',
state: ConsumerGroupStates.EMPTY,
type: testConsumerGroupProtocolClassic() ? ConsumerGroupTypes.CLASSIC : ConsumerGroupTypes.CONSUMER,
protocolType: 'consumer',
isSimpleConsumerGroup: false,
coordinator: expect.objectContaining({
Expand Down
Loading