Skip to content

Commit fe84e73

Browse files
committed
KAFKA-19779: Add per-partition epoch validation to streams groups
This commit enhances the offset commit validation logic in streams groups to validate against per-partition assignment epochs. When a member attempts to commit offsets with an older member epoch, the logic now validates that the epoch is not older than the assignment epoch for each individual partition being committed. The implementation adds a new `createAssignmentEpochValidator` method that creates partition-level validators, checking each partition against its assignment epoch from either assigned tasks or tasks pending revocation. We extend the SmokeTestDriverIntegrationTest to detect if we have processed more records than needed, which, in this restricted scenario, should only happen when offset commits are failing. We re-enable the previously flaky test in EosIntegrationTest, which failed due to previously failing offset commits. Both tests have been run 100x in there streams protocol variation to validate that they are not flaky anymore.
1 parent 858553c commit fe84e73

File tree

8 files changed

+318
-9
lines changed

8 files changed

+318
-9
lines changed

build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2900,7 +2900,6 @@ project(':streams:integration-tests') {
29002900
testImplementation libs.mockitoCore
29012901
testImplementation testLog4j2Libs
29022902
testImplementation project(':streams:test-utils')
2903-
testImplementation project(':test-common:test-common-util')
29042903

29052904
testRuntimeOnly runtimeTestLibs
29062905
}

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.apache.kafka.coordinator.group.OffsetExpirationCondition;
3333
import org.apache.kafka.coordinator.group.OffsetExpirationConditionImpl;
3434
import org.apache.kafka.coordinator.group.Utils;
35+
import org.apache.kafka.coordinator.group.generated.StreamsGroupTopologyValue;
3536
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
3637
import org.apache.kafka.timeline.SnapshotRegistry;
3738
import org.apache.kafka.timeline.TimelineHashMap;
@@ -726,8 +727,17 @@ public CommitPartitionValidator validateOffsetCommit(
726727
"by members using the streams group protocol");
727728
}
728729

729-
validateMemberEpoch(memberEpoch, member.memberEpoch());
730-
return CommitPartitionValidator.NO_OP;
730+
if (memberEpoch == member.memberEpoch()) {
731+
return CommitPartitionValidator.NO_OP;
732+
}
733+
734+
if (memberEpoch > member.memberEpoch()) {
735+
throw new StaleMemberEpochException(String.format("Received member epoch %d is newer than " +
736+
"current member epoch %d.", memberEpoch, member.memberEpoch()));
737+
}
738+
739+
// Member epoch is older; validate against per-partition assignment epochs.
740+
return createAssignmentEpochValidator(member, memberEpoch);
731741
}
732742

733743
/**
@@ -1120,4 +1130,54 @@ public void setLastAssignmentConfigs(Map<String, String> lastAssignmentConfigs)
11201130
this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
11211131
}
11221132
}
1133+
1134+
/**
1135+
* Creates a validator that checks if the received member epoch is valid for each partition's assignment epoch.
1136+
*
1137+
* @param member The member whose assignments are being validated.
1138+
* @param receivedMemberEpoch The received member epoch.
1139+
* @return A validator for per-partition validation.
1140+
*/
1141+
private CommitPartitionValidator createAssignmentEpochValidator(
1142+
final StreamsGroupMember member,
1143+
int receivedMemberEpoch
1144+
) {
1145+
// Retrieve topology once for all partitions - not per partition!
1146+
final StreamsTopology streamsTopology = topology.get().orElseThrow(() ->
1147+
new StaleMemberEpochException("Topology is not available for offset commit validation."));
1148+
1149+
final TasksTupleWithEpochs assignedTasks = member.assignedTasks();
1150+
final TasksTupleWithEpochs tasksPendingRevocation = member.tasksPendingRevocation();
1151+
1152+
return (topicName, topicId, partitionId) -> {
1153+
final StreamsGroupTopologyValue.Subtopology subtopology = streamsTopology.sourceTopicMap().get(topicName);
1154+
if (subtopology == null) {
1155+
throw new StaleMemberEpochException("Topic " + topicName + " is not in the topology.");
1156+
}
1157+
1158+
final String subtopologyId = subtopology.subtopologyId();
1159+
1160+
// Search for the partition in assigned tasks, then in tasks pending revocation
1161+
Integer assignmentEpoch = assignedTasks.activeTasksWithEpochs()
1162+
.getOrDefault(subtopologyId, Collections.emptyMap())
1163+
.get(partitionId);
1164+
if (assignmentEpoch == null) {
1165+
assignmentEpoch = tasksPendingRevocation.activeTasksWithEpochs()
1166+
.getOrDefault(subtopologyId, Collections.emptyMap())
1167+
.get(partitionId);
1168+
}
1169+
1170+
if (assignmentEpoch == null) {
1171+
throw new StaleMemberEpochException(String.format(
1172+
"Task %s-%d is not assigned or pending revocation for member.",
1173+
subtopologyId, partitionId));
1174+
}
1175+
1176+
if (receivedMemberEpoch < assignmentEpoch) {
1177+
throw new StaleMemberEpochException(String.format(
1178+
"Received member epoch %d is older than assignment epoch %d for task %s-%d.",
1179+
receivedMemberEpoch, assignmentEpoch, subtopologyId, partitionId));
1180+
}
1181+
};
1182+
}
11231183
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@
6666
import org.apache.kafka.coordinator.group.streams.StreamsGroup;
6767
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
6868
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
69+
import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;
6970
import org.apache.kafka.image.MetadataImage;
7071
import org.apache.kafka.server.common.ApiMessageAndVersion;
7172
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -3659,4 +3660,119 @@ private ClassicGroupMember mkGenericMember(
36593660
)
36603661
);
36613662
}
3663+
3664+
@Test
3665+
public void testStreamsGroupOffsetCommitWithAssignmentEpochValid() {
3666+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
3667+
StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
3668+
3669+
// Setup: topology with topic "bar" in subtopology "0"
3670+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
3671+
.setSubtopologyId("0")
3672+
.setSourceTopics(List.of("bar")))));
3673+
3674+
// Member at epoch 10, with partitions assigned at epoch 5
3675+
group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
3676+
.setMemberEpoch(10)
3677+
.setAssignedTasks(new TasksTupleWithEpochs(
3678+
Map.of("0", Map.of(0, 5, 1, 5)),
3679+
Map.of(), Map.of()))
3680+
.build());
3681+
3682+
// Commit with member epoch 5 should succeed (5 >= assignment epoch 5)
3683+
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(
3684+
new OffsetCommitRequestData()
3685+
.setGroupId("foo")
3686+
.setMemberId("member")
3687+
.setGenerationIdOrMemberEpoch(5)
3688+
.setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic()
3689+
.setName("bar")
3690+
.setPartitions(List.of(
3691+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3692+
.setPartitionIndex(0)
3693+
.setCommittedOffset(100L),
3694+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3695+
.setPartitionIndex(1)
3696+
.setCommittedOffset(200L))))));
3697+
3698+
assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(0).errorCode());
3699+
assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(1).errorCode());
3700+
assertEquals(2, result.records().size());
3701+
}
3702+
3703+
@Test
3704+
public void testStreamsGroupOffsetCommitWithAssignmentEpochStale() {
3705+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
3706+
StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
3707+
3708+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
3709+
.setSubtopologyId("0")
3710+
.setSourceTopics(List.of("bar")))));
3711+
3712+
// Member at epoch 10, with partitions assigned at different epochs
3713+
group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
3714+
.setMemberEpoch(10)
3715+
.setAssignedTasks(new TasksTupleWithEpochs(
3716+
Map.of("0", Map.of(0, 5, 1, 8)),
3717+
Map.of(), Map.of()))
3718+
.build());
3719+
3720+
// Commit with member epoch 3 should fail (3 < assignment epochs 5 and 8)
3721+
assertThrows(StaleMemberEpochException.class, () -> context.commitOffset(
3722+
new OffsetCommitRequestData()
3723+
.setGroupId("foo")
3724+
.setMemberId("member")
3725+
.setGenerationIdOrMemberEpoch(3)
3726+
.setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic()
3727+
.setName("bar")
3728+
.setPartitions(List.of(
3729+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3730+
.setPartitionIndex(0)
3731+
.setCommittedOffset(100L),
3732+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3733+
.setPartitionIndex(1)
3734+
.setCommittedOffset(200L)))))));
3735+
}
3736+
3737+
@Test
3738+
public void testStreamsGroupOffsetCommitWithOlderMemberEpochValidAssignments() {
3739+
OffsetMetadataManagerTestContext context = new OffsetMetadataManagerTestContext.Builder().build();
3740+
StreamsGroup group = context.groupMetadataManager.getOrMaybeCreatePersistedStreamsGroup("foo", true);
3741+
3742+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
3743+
.setSubtopologyId("0")
3744+
.setSourceTopics(List.of("bar")))));
3745+
3746+
// Member at epoch 10, with partitions assigned at different epochs (2, 3, 5)
3747+
group.updateMember(StreamsGroupMember.Builder.withDefaults("member")
3748+
.setMemberEpoch(10)
3749+
.setAssignedTasks(new TasksTupleWithEpochs(
3750+
Map.of("0", Map.of(0, 2, 1, 3, 2, 5)),
3751+
Map.of(), Map.of()))
3752+
.build());
3753+
3754+
// Commit with member epoch 5 should succeed (5 >= all assignment epochs)
3755+
CoordinatorResult<OffsetCommitResponseData, CoordinatorRecord> result = context.commitOffset(
3756+
new OffsetCommitRequestData()
3757+
.setGroupId("foo")
3758+
.setMemberId("member")
3759+
.setGenerationIdOrMemberEpoch(5)
3760+
.setTopics(List.of(new OffsetCommitRequestData.OffsetCommitRequestTopic()
3761+
.setName("bar")
3762+
.setPartitions(List.of(
3763+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3764+
.setPartitionIndex(0)
3765+
.setCommittedOffset(100L),
3766+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3767+
.setPartitionIndex(1)
3768+
.setCommittedOffset(200L),
3769+
new OffsetCommitRequestData.OffsetCommitRequestPartition()
3770+
.setPartitionIndex(2)
3771+
.setCommittedOffset(300L))))));
3772+
3773+
assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(0).errorCode());
3774+
assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(1).errorCode());
3775+
assertEquals(Errors.NONE.code(), result.response().topics().get(0).partitions().get(2).errorCode());
3776+
assertEquals(3, result.records().size());
3777+
}
36623778
}

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/StreamsGroupTest.java

Lines changed: 109 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.apache.kafka.coordinator.group.streams.topics.ConfiguredTopology;
4848
import org.apache.kafka.image.MetadataImage;
4949
import org.apache.kafka.timeline.SnapshotRegistry;
50+
import org.apache.kafka.coordinator.group.CommitPartitionValidator;
5051

5152
import org.junit.jupiter.api.Test;
5253
import org.junit.jupiter.params.ParameterizedTest;
@@ -660,7 +661,7 @@ public void testValidateOffsetCommit(short version) {
660661
assertThrows(UnknownMemberIdException.class, () ->
661662
group.validateOffsetCommit("", null, -1, isTransactional, version));
662663

663-
// The member epoch is stale.
664+
// The member epoch is stale (newer than current).
664665
if (version >= 9) {
665666
assertThrows(StaleMemberEpochException.class, () ->
666667
group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version));
@@ -669,7 +670,7 @@ public void testValidateOffsetCommit(short version) {
669670
group.validateOffsetCommit("new-protocol-member-id", "", 10, isTransactional, version));
670671
}
671672

672-
// This should succeed.
673+
// This should succeed (matching member epoch).
673674
if (version >= 9) {
674675
group.validateOffsetCommit("new-protocol-member-id", "", 0, isTransactional, version);
675676
} else {
@@ -678,6 +679,112 @@ public void testValidateOffsetCommit(short version) {
678679
}
679680
}
680681

682+
@Test
683+
public void testValidateOffsetCommitWithOlderEpoch() {
684+
StreamsGroup group = createStreamsGroup("group-foo");
685+
686+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
687+
.setSubtopologyId("0")
688+
.setSourceTopics(List.of("input-topic")))));
689+
690+
group.updateMember(new StreamsGroupMember.Builder("member-1")
691+
.setMemberEpoch(2)
692+
.setAssignedTasks(new TasksTupleWithEpochs(
693+
Map.of("0", Map.of(0, 2, 1, 2)),
694+
Map.of(), Map.of()))
695+
.build());
696+
697+
CommitPartitionValidator validator = group.validateOffsetCommit(
698+
"member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
699+
700+
// Received epoch (1) < assignment epoch (2) should throw
701+
assertThrows(StaleMemberEpochException.class, () ->
702+
validator.validate("input-topic", Uuid.ZERO_UUID, 0));
703+
}
704+
705+
@Test
706+
public void testValidateOffsetCommitWithOlderEpochMissingTopology() {
707+
StreamsGroup group = createStreamsGroup("group-foo");
708+
709+
group.updateMember(new StreamsGroupMember.Builder("member-1")
710+
.setMemberEpoch(2)
711+
.build());
712+
713+
// Topology is retrieved when creating validator, so exception is thrown here
714+
assertThrows(StaleMemberEpochException.class, () ->
715+
group.validateOffsetCommit("member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion()));
716+
}
717+
718+
@Test
719+
public void testValidateOffsetCommitWithOlderEpochMissingSubtopology() {
720+
StreamsGroup group = createStreamsGroup("group-foo");
721+
722+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
723+
.setSubtopologyId("0")
724+
.setSourceTopics(List.of("input-topic")))));
725+
726+
group.updateMember(new StreamsGroupMember.Builder("member-1")
727+
.setMemberEpoch(2)
728+
.build());
729+
730+
CommitPartitionValidator validator = group.validateOffsetCommit(
731+
"member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
732+
733+
assertThrows(StaleMemberEpochException.class, () ->
734+
validator.validate("unknown-topic", Uuid.ZERO_UUID, 0));
735+
}
736+
737+
@Test
738+
public void testValidateOffsetCommitWithOlderEpochUnassignedPartition() {
739+
StreamsGroup group = createStreamsGroup("group-foo");
740+
741+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
742+
.setSubtopologyId("0")
743+
.setSourceTopics(List.of("input-topic")))));
744+
745+
group.updateMember(new StreamsGroupMember.Builder("member-1")
746+
.setMemberEpoch(2)
747+
.setAssignedTasks(new TasksTupleWithEpochs(
748+
Map.of("0", Map.of(0, 2)),
749+
Map.of(), Map.of()))
750+
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
751+
.build());
752+
753+
CommitPartitionValidator validator = group.validateOffsetCommit(
754+
"member-1", "", 1, false, ApiKeys.OFFSET_COMMIT.latestVersion());
755+
756+
// Partition 0 assigned with epoch 2, received epoch 1 should throw
757+
assertThrows(StaleMemberEpochException.class, () ->
758+
validator.validate("input-topic", Uuid.ZERO_UUID, 0));
759+
760+
// Partition 1 not assigned should throw
761+
assertThrows(StaleMemberEpochException.class, () ->
762+
validator.validate("input-topic", Uuid.ZERO_UUID, 1));
763+
}
764+
765+
@Test
766+
public void testValidateOffsetCommitWithOlderEpochValidAssignment() {
767+
StreamsGroup group = createStreamsGroup("group-foo");
768+
769+
group.setTopology(new StreamsTopology(1, Map.of("0", new StreamsGroupTopologyValue.Subtopology()
770+
.setSubtopologyId("0")
771+
.setSourceTopics(List.of("input-topic")))));
772+
773+
group.updateMember(new StreamsGroupMember.Builder("member-1")
774+
.setMemberEpoch(5)
775+
.setAssignedTasks(new TasksTupleWithEpochs(
776+
Map.of("0", Map.of(0, 2, 1, 2)),
777+
Map.of(), Map.of()))
778+
.build());
779+
780+
CommitPartitionValidator validator = group.validateOffsetCommit(
781+
"member-1", "", 2, false, ApiKeys.OFFSET_COMMIT.latestVersion());
782+
783+
// Received epoch 2 == assignment epoch 2 should succeed
784+
validator.validate("input-topic", Uuid.ZERO_UUID, 0);
785+
validator.validate("input-topic", Uuid.ZERO_UUID, 1);
786+
}
787+
681788
@Test
682789
public void testAsListedGroup() {
683790
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(LOG_CONTEXT);

streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import org.apache.kafka.common.serialization.LongDeserializer;
3636
import org.apache.kafka.common.serialization.LongSerializer;
3737
import org.apache.kafka.common.serialization.Serdes;
38-
import org.apache.kafka.common.test.api.Flaky;
3938
import org.apache.kafka.common.utils.Utils;
4039
import org.apache.kafka.streams.KafkaStreams;
4140
import org.apache.kafka.streams.KeyValue;
@@ -396,7 +395,6 @@ public void shouldBeAbleToPerformMultipleTransactions(final String groupProtocol
396395
}
397396
}
398397

399-
@Flaky("KAFKA-19816")
400398
@ParameterizedTest
401399
@MethodSource("groupProtocolAndProcessingThreadsParameters")
402400
public void shouldNotViolateEosIfOneTaskFails(final String groupProtocol, final boolean processingThreadsEnabled) throws Exception {

0 commit comments

Comments
 (0)