Skip to content

Conversation

@lucliu1108
Copy link
Contributor

@lucliu1108 lucliu1108 commented Oct 23, 2025

What

Ticket: https://issues.apache.org/jira/browse/KAFKA-19807

Add integration test similar to ShareGroupHeartbeatRequestTest and
ConsumerGroupHeartbeatRequestTest for StreamsGroupHeartbeat

Reviewers: @lucasbru

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) labels Oct 23, 2025
@lucasbru lucasbru requested review from Copilot and lucasbru October 23, 2025 07:47
@lucasbru lucasbru self-assigned this Oct 23, 2025
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR adds RPC-level integration tests for the StreamsGroupHeartbeat API, following the same testing pattern as ShareGroupHeartbeatRequestTest and ConsumerGroupHeartbeatRequestTest.

Key changes:

  • Implements comprehensive integration tests for the StreamsGroupHeartbeat RPC API
  • Tests various scenarios including API version validation, feature flag checks, multi-member coordination, and error handling
  • Validates proper behavior when topics are missing and subsequently created

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

new ClusterFeature(feature = Feature.STREAMS_VERSION, version = 0)
)
)
def testStreamsGroupHeartbeatIsInaccessableWhenDisabledByFeatureConfig(): Unit = {
Copy link

Copilot AI Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'Inaccessable' to 'Inaccessible' in test method name.

Suggested change
def testStreamsGroupHeartbeatIsInaccessableWhenDisabledByFeatureConfig(): Unit = {
def testStreamsGroupHeartbeatIsInaccessibleWhenDisabledByFeatureConfig(): Unit = {

Copilot uses AI. Check for mistakes.
new ClusterConfigProperty(key = GroupCoordinatorConfig.GROUP_COORDINATOR_REBALANCE_PROTOCOLS_CONFIG, value = "classic,consumer"),
)
)
def testStreamsGroupHeartbeatIsInaccessableWhenDisabledByStaticGroupCoordinatorProtocolConfig(): Unit = {
Copy link

Copilot AI Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'Inaccessable' to 'Inaccessible' in test method name.

Suggested change
def testStreamsGroupHeartbeatIsInaccessableWhenDisabledByStaticGroupCoordinatorProtocolConfig(): Unit = {
def testStreamsGroupHeartbeatIsInaccessibleWhenDisabledByStaticGroupCoordinatorProtocolConfig(): Unit = {

Copilot uses AI. Check for mistakes.
}

@ClusterTest
def tesStreamsGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabledTopicNotExistFirst(): Unit = {
Copy link

Copilot AI Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'tesStreamsGroupHeartbeat' to 'testStreamsGroupHeartbeat' in test method name (missing 't').

Suggested change
def tesStreamsGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabledTopicNotExistFirst(): Unit = {
def testStreamsGroupHeartbeatIsAccessibleWhenNewGroupCoordinatorIsEnabledTopicNotExistFirst(): Unit = {

Copilot uses AI. Check for mistakes.
}

@ClusterTest
def tesStreamsGroupHeartbeatForMultipleMembers(): Unit = {
Copy link

Copilot AI Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Corrected spelling of 'tesStreamsGroupHeartbeat' to 'testStreamsGroupHeartbeat' in test method name (missing 't').

Suggested change
def tesStreamsGroupHeartbeatForMultipleMembers(): Unit = {
def testStreamsGroupHeartbeatForMultipleMembers(): Unit = {

Copilot uses AI. Check for mistakes.
Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Looking very good already. Could we add a couple more tests:

  • As in ConsumerGroupHeartbeatRequestTest, we could set a dynamic group config. In particular, I think it would be good to test setting streams.num.standby.replicas. We can set the config from 0 to 1 and check if we get a standby task in the assignment after heartbeating again. Note that we need 2 members with two different process IDs for this to work. This would also test your earlier change that triggers a rebalance upon config change.

  • It would be good to have a basic test to check that internal topics are created. So essentially heartbeat with a topology that contains internal topics (changelog topic, repartition topic). Then wait that the internal topics eventually appear.

  • Having equivalents for ShareGroupHeartbeatRequestTest.testMemberJoiningAndExpiring and ShareGroupHeartbeatRequestTest.testGroupCoordinatorChange would be very nice to have.

}, "StreamsGroupHeartbeatRequest did not succeed within the timeout period.")

// Verify the response
assert(streamsGroupHeartbeatResponse != null, "StreamsGroupHeartbeatResponse should not be null")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a streams assert. Maybe we should use junit asserts only to be consistent. Here we'd use assertNotNull.

@lucliu1108 lucliu1108 changed the title KAFKA-19807: Add RPC-level integration tests for StreamsGroupHeartbeat KAFKA-19807: Add RPC-level integration tests for StreamsGroupHeartbeat [1/2] Oct 23, 2025
@lucliu1108 lucliu1108 requested a review from lucasbru October 23, 2025 15:27
@lucliu1108
Copy link
Contributor Author

Thanks @lucasbru for the review!

I will add the other tests in a follow-up PR.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I have two more thing I noticed while reviewing the second PR

// Test with streams.version = 0, the API is disabled at server level
val topology = new StreamsGroupHeartbeatRequestData.Topology()
.setEpoch(1)
.setSubtopologies(java.util.Collections.emptyList())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

java.util.Collections is already imported, so you do not need to add the FQN here.

Also in many other places.

You could also consider List.of() instead which is more readable.

.setStandbyTasks(java.util.Collections.emptyList())
.setWarmupTasks(java.util.Collections.emptyList())
.setTopology(topology)
).build(0)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably always build the latest version?

).build(ApiKeys.STREAMS_GROUP_HEARTBEAT.latestVersion(isUnstableApiEnabled))

@lucliu1108 lucliu1108 requested a review from lucasbru October 26, 2025 22:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

core Kafka Broker tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants