Skip to content

Commit 858553c

Browse files
committed
simplification
1 parent a376694 commit 858553c

File tree

3 files changed

+35
-166
lines changed

3 files changed

+35
-166
lines changed

group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java

Lines changed: 11 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,7 @@ public StreamsGroupMember build() {
165165
if (member.memberEpoch() != targetAssignmentEpoch) {
166166
return computeNextAssignment(
167167
member.memberEpoch(),
168-
member.assignedTasks(),
169-
member.tasksPendingRevocation()
168+
member.assignedTasks()
170169
);
171170
} else {
172171
return member;
@@ -193,8 +192,7 @@ public StreamsGroupMember build() {
193192
// its state towards the latest target assignment.
194193
return computeNextAssignment(
195194
member.memberEpoch() + 1,
196-
member.assignedTasks(),
197-
member.tasksPendingRevocation()
195+
member.assignedTasks()
198196
);
199197

200198
case UNRELEASED_TASKS:
@@ -203,8 +201,7 @@ public StreamsGroupMember build() {
203201
// of the unreleased tasks when they become available.
204202
return computeNextAssignment(
205203
member.memberEpoch(),
206-
member.assignedTasks(),
207-
member.tasksPendingRevocation()
204+
member.assignedTasks()
208205
);
209206

210207
case UNKNOWN:
@@ -220,8 +217,7 @@ public StreamsGroupMember build() {
220217

221218
return computeNextAssignment(
222219
targetAssignmentEpoch,
223-
member.assignedTasks(),
224-
member.tasksPendingRevocation()
220+
member.assignedTasks()
225221
);
226222
}
227223

@@ -321,16 +317,12 @@ private static boolean computeAssignmentDifferenceForOneSubtopology(final String
321317
* another member, as defined by the `isUnreleasedTask` predicate).
322318
*
323319
* Epoch Handling:
324-
* - For tasks in resultAssignedTasks, the epoch from currentAssignment is preserved.
325-
* - For tasks in resultTasksPendingAssignment, the epoch is determined as follows:
326-
* - If the task is present in memberTasksPendingRevocation, its epoch from that map is used
327-
* (to preserve the revocation epoch for tasks being revoked and re-assigned).
328-
* - Otherwise, the targetAssignmentEpoch is used.
320+
* - For tasks in resultAssignedTasks and resultTasksPendingRevocation, the epoch from currentAssignment is preserved.
321+
* - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch is used.
329322
*/
330323
private boolean computeAssignmentDifferenceWithEpoch(Map<String, Map<Integer, Integer>> currentAssignment,
331324
Map<String, Set<Integer>> targetAssignment,
332325
int targetAssignmentEpoch,
333-
Map<String, Map<Integer, Integer>> memberTasksPendingRevocation,
334326
Map<String, Map<Integer, Integer>> resultAssignedTasks,
335327
Map<String, Map<Integer, Integer>> resultTasksPendingRevocation,
336328
Map<String, Map<Integer, Integer>> resultTasksPendingAssignment,
@@ -346,7 +338,6 @@ private boolean computeAssignmentDifferenceWithEpoch(Map<String, Map<Integer, In
346338
currentAssignment.getOrDefault(subtopologyId, Map.of()),
347339
targetAssignment.getOrDefault(subtopologyId, Set.of()),
348340
targetAssignmentEpoch,
349-
memberTasksPendingRevocation.getOrDefault(subtopologyId, Map.of()),
350341
resultAssignedTasks,
351342
resultTasksPendingRevocation,
352343
resultTasksPendingAssignment,
@@ -360,7 +351,6 @@ private static boolean computeAssignmentDifferenceForOneSubtopologyWithEpoch(fin
360351
final Map<Integer, Integer> currentTasksForThisSubtopology,
361352
final Set<Integer> targetTasksForThisSubtopology,
362353
final int targetAssignmentEpoch,
363-
final Map<Integer, Integer> currentTasksPendingRevocationForThisSubtopology,
364354
final Map<String, Map<Integer, Integer>> resultAssignedTasks,
365355
final Map<String, Map<Integer, Integer>> resultTasksPendingRevocation,
366356
final Map<String, Map<Integer, Integer>> resultTasksPendingAssignment,
@@ -387,9 +377,7 @@ private static boolean computeAssignmentDifferenceForOneSubtopologyWithEpoch(fin
387377
Map<Integer, Integer> resultTasksPendingAssignmentForThisSubtopology = new HashMap<>();
388378
for (Integer taskId : targetTasksForThisSubtopology) {
389379
if (!resultAssignedTasksForThisSubtopology.containsKey(taskId)) {
390-
// Use the epoch from pending revocation if the task is there, otherwise use targetAssignmentEpoch
391-
Integer epoch = currentTasksPendingRevocationForThisSubtopology.getOrDefault(taskId, targetAssignmentEpoch);
392-
resultTasksPendingAssignmentForThisSubtopology.put(taskId, epoch);
380+
resultTasksPendingAssignmentForThisSubtopology.put(taskId, targetAssignmentEpoch);
393381
}
394382
}
395383
boolean hasUnreleasedTasks = resultTasksPendingAssignmentForThisSubtopology.keySet().removeIf(taskId ->
@@ -414,15 +402,13 @@ private static boolean computeAssignmentDifferenceForOneSubtopologyWithEpoch(fin
414402
/**
415403
* Computes the next assignment.
416404
*
417-
* @param memberEpoch The epoch of the member to use. This may be different from
418-
* the epoch in {@link CurrentAssignmentBuilder#member}.
419-
* @param memberAssignedTasks The assigned tasks of the member to use.
420-
* @param memberTasksPendingRevocation The tasks pending revocation of the member.
405+
* @param memberEpoch The epoch of the member to use. This may be different from
406+
* the epoch in {@link CurrentAssignmentBuilder#member}.
407+
* @param memberAssignedTasks The assigned tasks of the member to use.
421408
* @return A new StreamsGroupMember.
422409
*/
423410
private StreamsGroupMember computeNextAssignment(int memberEpoch,
424-
TasksTupleWithEpochs memberAssignedTasks,
425-
TasksTupleWithEpochs memberTasksPendingRevocation) {
411+
TasksTupleWithEpochs memberAssignedTasks) {
426412
Map<String, Map<Integer, Integer>> newActiveAssignedTasks = new HashMap<>();
427413
Map<String, Map<Integer, Integer>> newActiveTasksPendingRevocation = new HashMap<>();
428414
Map<String, Map<Integer, Integer>> newActiveTasksPendingAssignment = new HashMap<>();
@@ -437,7 +423,6 @@ private StreamsGroupMember computeNextAssignment(int memberEpoch,
437423
memberAssignedTasks.activeTasksWithEpochs(),
438424
targetAssignment.activeTasks(),
439425
targetAssignmentEpoch,
440-
memberTasksPendingRevocation.activeTasksWithEpochs(),
441426
newActiveAssignedTasks,
442427
newActiveTasksPendingRevocation,
443428
newActiveTasksPendingAssignment,

group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java

Lines changed: 24 additions & 138 deletions
Original file line numberDiff line numberDiff line change
@@ -834,12 +834,8 @@ public void testUnknownState(TaskRole taskRole) {
834834
);
835835
}
836836

837-
/**
838-
* Tests epoch assignment from assigned tasks.
839-
* This tests the first lookup path in addEpochsToTasks where epochs are found in assigned tasks.
840-
*/
841837
@Test
842-
public void testEpochLookupFromAssignedTasks() {
838+
public void testAssignmentEpochsShouldBePreservedFromPreviousAssignment() {
843839
final int memberEpoch = 10;
844840

845841
// Create a member with tasks that have specific epochs in assigned tasks
@@ -880,73 +876,10 @@ public void testEpochLookupFromAssignedTasks() {
880876
);
881877
}
882878

883-
/**
884-
* Tests epoch assignment from tasks pending revocation.
885-
* This tests the second lookup path in addEpochsToTasks where epochs are found in pending revocation.
886-
* When tasks pending revocation are brought back into the target assignment after the member revokes them,
887-
* the epochs should be preserved from the pending revocation.
888-
*/
889-
@Test
890-
public void testEpochLookupFromPendingRevocation() {
891-
final int memberEpoch = 10;
892-
893-
// Create a member in UNREVOKED_TASKS state with:
894-
// - Some tasks assigned (5, 6) with epochs (9, 10)
895-
// - Some tasks pending revocation (1, 2, 3, 4) with specific epochs (5, 6, 7, 8)
896-
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
897-
.setState(MemberState.UNREVOKED_TASKS)
898-
.setProcessId(PROCESS_ID)
899-
.setMemberEpoch(memberEpoch)
900-
.setPreviousMemberEpoch(memberEpoch)
901-
.setAssignedTasks(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
902-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(5, 9)),
903-
mkTasksWithEpochs(SUBTOPOLOGY_ID2, Map.of(6, 10))))
904-
.setTasksPendingRevocation(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
905-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 5, 2, 6)),
906-
mkTasksWithEpochs(SUBTOPOLOGY_ID2, Map.of(3, 7, 4, 8))))
907-
.build();
908-
909-
// Target assignment brings back the pending tasks (1, 2, 3, 4) along with current assigned (5, 6)
910-
// The member has revoked the pending tasks (they're not in owned assignment)
911-
// So it should transition to next epoch and assign the tasks
912-
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
913-
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(TaskRole.ACTIVE,
914-
mkTasks(SUBTOPOLOGY_ID1, 1, 2, 5),
915-
mkTasks(SUBTOPOLOGY_ID2, 3, 4, 6)))
916-
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
917-
.withCurrentStandbyTaskProcessIds((subtopologyId, partitionId) -> Set.of())
918-
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Set.of())
919-
.withOwnedAssignment(mkTasksTuple(TaskRole.ACTIVE,
920-
mkTasks(SUBTOPOLOGY_ID1, 5), // Only owns the assigned tasks, not the pending revocation
921-
mkTasks(SUBTOPOLOGY_ID2, 6)))
922-
.build();
923-
924-
// Verify that:
925-
// - Tasks from pending revocation (1@5, 2@6, 3@7, 4@8) retain their epochs when re-assigned
926-
// - Tasks from assigned (5@9, 6@10) retain their epochs
927-
assertEquals(
928-
new StreamsGroupMember.Builder(MEMBER_NAME)
929-
.setState(MemberState.STABLE)
930-
.setProcessId(PROCESS_ID)
931-
.setMemberEpoch(memberEpoch + 1)
932-
.setPreviousMemberEpoch(memberEpoch)
933-
.setAssignedTasks(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
934-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 5, 2, 6, 5, 9)),
935-
mkTasksWithEpochs(SUBTOPOLOGY_ID2, Map.of(3, 7, 4, 8, 6, 10))))
936-
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
937-
.build(),
938-
updatedMember
939-
);
940-
}
941-
942-
/**
943-
* Tests fallback to default epoch.
944-
* This tests the third path in addEpochsToTasks where epochs are not found in assigned or pending,
945-
* so the default epoch (target assignment epoch) is used.
946-
*/
947879
@Test
948-
public void testEpochFallbackToDefault() {
880+
public void testNewlyAssignedTasksGetTargetAssignmentEpoch() {
949881
final int memberEpoch = 10;
882+
final int targetAssignmentEpoch = 11;
950883

951884
// Create a member with empty assignments
952885
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
@@ -960,22 +893,22 @@ public void testEpochFallbackToDefault() {
960893

961894
// New tasks are assigned
962895
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
963-
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(TaskRole.ACTIVE,
896+
.withTargetAssignment(targetAssignmentEpoch, mkTasksTuple(TaskRole.ACTIVE,
964897
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
965898
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
966899
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
967900
.withCurrentStandbyTaskProcessIds((subtopologyId, partitionId) -> Set.of())
968901
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Set.of())
969902
.build();
970903

971-
// Verify that all tasks use the default epoch (memberEpoch + 1)
904+
// Verify that all tasks use the target assignment epoch
972905
assertEquals(
973906
new StreamsGroupMember.Builder(MEMBER_NAME)
974907
.setState(MemberState.STABLE)
975908
.setProcessId(PROCESS_ID)
976-
.setMemberEpoch(memberEpoch + 1)
909+
.setMemberEpoch(targetAssignmentEpoch)
977910
.setPreviousMemberEpoch(memberEpoch)
978-
.setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, memberEpoch + 1,
911+
.setAssignedTasks(mkTasksTupleWithCommonEpoch(TaskRole.ACTIVE, targetAssignmentEpoch,
979912
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
980913
mkTasks(SUBTOPOLOGY_ID2, 3, 4)))
981914
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
@@ -986,14 +919,16 @@ public void testEpochFallbackToDefault() {
986919

987920
/**
988921
* Tests mixed epoch assignment scenarios.
989-
* This tests all three paths in addEpochsToTasks:
990-
* - Some epochs from assigned tasks
991-
* - Some epochs from pending revocation
992-
* - Some epochs fallback to default
922+
* - Some epochs from previously assigned tasks (Tasks 1, 2).
923+
* This happens regardless of whether the assigned task is reconciled (owned) by the client (Task 1) or not (Task 2)
924+
* - Some newly assigned task (Task 5) which should get the target assignment epoch.
925+
* - Some tasks are revoked by the member (Task 3, 4). One is immediately reassigned, which also gets
926+
* the target assignment epoch (Task 3).
993927
*/
994928
@Test
995-
public void testMixedEpochLookup() {
929+
public void testMixedPreservedAndNewAssignmentEpochs() {
996930
final int memberEpoch = 10;
931+
final int targetAssignmentEpoch = 11;
997932

998933
// Create a member with:
999934
// - Tasks 1, 2 in assigned with epochs 5, 6
@@ -1009,80 +944,31 @@ public void testMixedEpochLookup() {
1009944
mkTasksWithEpochs(SUBTOPOLOGY_ID2, Map.of(3, 7, 4, 8))))
1010945
.build();
1011946

1012-
// Target assignment includes:
1013-
// - Task 1 from assigned (should use epoch 5)
1014-
// - Task 3 from pending revocation (should use epoch 7)
1015-
// - Task 5 (new task, should use default epoch memberEpoch + 1 = 11)
1016-
// The member revokes tasks 2, 3, 4 (not in owned), transitions to next epoch
947+
// The member revokes tasks 3, 4 (not in owned), transitions to next epoch
1017948
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
1018-
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(TaskRole.ACTIVE,
1019-
mkTasks(SUBTOPOLOGY_ID1, 1),
949+
.withTargetAssignment(targetAssignmentEpoch, mkTasksTuple(TaskRole.ACTIVE,
950+
mkTasks(SUBTOPOLOGY_ID1, 1, 2),
1020951
mkTasks(SUBTOPOLOGY_ID2, 3, 5)))
1021952
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> null)
1022953
.withCurrentStandbyTaskProcessIds((subtopologyId, partitionId) -> Set.of())
1023954
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Set.of())
1024955
.withOwnedAssignment(mkTasksTuple(TaskRole.ACTIVE,
1025-
mkTasks(SUBTOPOLOGY_ID1, 1))) // Only owns task 1 (task 2 is being revoked, tasks 3,4 already revoked)
956+
mkTasks(SUBTOPOLOGY_ID1, 1))) // Only owns task 1 (task 2 is not yet reconciled, tasks 3,4 already revoked)
1026957
.build();
1027958

1028959
// Verify mixed epoch assignment:
1029-
// - Task 1 from SUBTOPOLOGY_ID1 should have epoch 5 (from assigned)
1030-
// - Task 3 from SUBTOPOLOGY_ID2 should have epoch 7 (from pending revocation)
1031-
// - Task 5 from SUBTOPOLOGY_ID2 should have epoch 11 (default)
1032-
assertEquals(
1033-
new StreamsGroupMember.Builder(MEMBER_NAME)
1034-
.setState(MemberState.STABLE)
1035-
.setProcessId(PROCESS_ID)
1036-
.setMemberEpoch(memberEpoch + 1)
1037-
.setPreviousMemberEpoch(memberEpoch)
1038-
.setAssignedTasks(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
1039-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 5)),
1040-
mkTasksWithEpochs(SUBTOPOLOGY_ID2, Map.of(3, 7, 5, memberEpoch + 1))))
1041-
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
1042-
.build(),
1043-
updatedMember
1044-
);
1045-
}
1046-
1047-
/**
1048-
* Tests that assigned tasks take precedence over pending revocation.
1049-
* If a task exists in both assigned and pending revocation, the epoch from assigned should be used.
1050-
*/
1051-
@Test
1052-
public void testAssignedTasksHavePrecedenceOverPendingRevocation() {
1053-
final int memberEpoch = 10;
1054-
1055-
// Create a member with task 1 in both assigned (epoch 5) and pending revocation (epoch 7)
1056-
// This shouldn't normally happen, but tests the precedence logic
1057-
StreamsGroupMember member = new StreamsGroupMember.Builder(MEMBER_NAME)
1058-
.setState(MemberState.STABLE)
1059-
.setProcessId(PROCESS_ID)
1060-
.setMemberEpoch(memberEpoch)
1061-
.setPreviousMemberEpoch(memberEpoch)
1062-
.setAssignedTasks(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
1063-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 5, 2, 6))))
1064-
.setTasksPendingRevocation(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
1065-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 7))))
1066-
.build();
1067-
1068-
// Target assignment includes task 1
1069-
StreamsGroupMember updatedMember = new CurrentAssignmentBuilder(member)
1070-
.withTargetAssignment(memberEpoch + 1, mkTasksTuple(TaskRole.ACTIVE,
1071-
mkTasks(SUBTOPOLOGY_ID1, 1, 2)))
1072-
.withCurrentActiveTaskProcessId((subtopologyId, partitionId) -> PROCESS_ID)
1073-
.withCurrentStandbyTaskProcessIds((subtopologyId, partitionId) -> Set.of())
1074-
.withCurrentWarmupTaskProcessIds((subtopologyId, partitionId) -> Set.of())
1075-
.build();
1076-
1077-
// Verify that task 1 uses epoch 5 from assigned, not epoch 7 from pending revocation
960+
// - Task 1 from SUBTOPOLOGY_ID1 should have epoch 5 (previous assignment epoch)
961+
// - Task 3 from SUBTOPOLOGY_ID2 should have epoch 11 (target assignment epoch)
962+
// - Task 5 from SUBTOPOLOGY_ID2 should have epoch 11 (target assignment epoch)
1078963
assertEquals(
1079964
new StreamsGroupMember.Builder(MEMBER_NAME)
1080965
.setState(MemberState.STABLE)
1081966
.setProcessId(PROCESS_ID)
1082-
.setMemberEpoch(memberEpoch + 1)
967+
.setMemberEpoch(targetAssignmentEpoch)
1083968
.setPreviousMemberEpoch(memberEpoch)
1084969
.setAssignedTasks(mkTasksTupleWithEpochs(TaskRole.ACTIVE,
1085-
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 5, 2, 6))))
970+
mkTasksWithEpochs(SUBTOPOLOGY_ID1, Map.of(1, 5, 2, 6)),
971+
mkTasksWithEpochs(SUBTOPOLOGY_ID2, Map.of(3, targetAssignmentEpoch, 5, targetAssignmentEpoch))))
1086972
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
1087973
.build(),
1088974
updatedMember

0 commit comments

Comments
 (0)