Skip to content

Commit c235305

Browse files
authored
Kafka Connect: Don't check that consumer group is stable for coordinator leader election (#14395)
1 parent e268df6 commit c235305

File tree

2 files changed

+70
-13
lines changed

2 files changed

+70
-13
lines changed

kafka-connect/kafka-connect/src/main/java/org/apache/iceberg/connect/channel/CommitterImpl.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import org.apache.kafka.clients.admin.Admin;
3030
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
3131
import org.apache.kafka.clients.admin.MemberDescription;
32-
import org.apache.kafka.common.ConsumerGroupState;
3332
import org.apache.kafka.common.TopicPartition;
3433
import org.apache.kafka.connect.errors.ConnectException;
3534
import org.apache.kafka.connect.sink.SinkRecord;
@@ -74,18 +73,19 @@ public int compare(TopicPartition o1, TopicPartition o2) {
7473
}
7574
}
7675

77-
private boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
76+
@VisibleForTesting
77+
boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
7878
ConsumerGroupDescription groupDesc;
7979
try (Admin admin = clientFactory.createAdmin()) {
8080
groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin);
8181
}
82-
if (groupDesc.state() == ConsumerGroupState.STABLE) {
83-
Collection<MemberDescription> members = groupDesc.members();
84-
if (containsFirstPartition(members, currentAssignedPartitions)) {
85-
membersWhenWorkerIsCoordinator = members;
86-
return true;
87-
}
82+
83+
Collection<MemberDescription> members = groupDesc.members();
84+
if (containsFirstPartition(members, currentAssignedPartitions)) {
85+
membersWhenWorkerIsCoordinator = members;
86+
return true;
8887
}
88+
8989
return false;
9090
}
9191

kafka-connect/kafka-connect/src/test/java/org/apache/iceberg/connect/channel/TestCommitterImpl.java

Lines changed: 62 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,55 @@
1919
package org.apache.iceberg.connect.channel;
2020

2121
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.mockStatic;
25+
import static org.mockito.Mockito.when;
2226

27+
import java.lang.reflect.Field;
2328
import java.util.List;
2429
import java.util.Optional;
30+
import org.apache.iceberg.connect.IcebergSinkConfig;
2531
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
2632
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
33+
import org.apache.kafka.clients.admin.Admin;
34+
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
2735
import org.apache.kafka.clients.admin.MemberAssignment;
2836
import org.apache.kafka.clients.admin.MemberDescription;
2937
import org.apache.kafka.common.TopicPartition;
3038
import org.junit.jupiter.api.Test;
39+
import org.mockito.MockedStatic;
3140

3241
public class TestCommitterImpl {
3342

3443
@Test
3544
public void testIsLeader() {
45+
MemberAssignment assignment1 =
46+
new MemberAssignment(
47+
ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1)));
48+
MemberDescription member1 =
49+
new MemberDescription(null, Optional.empty(), null, null, assignment1);
50+
51+
MemberAssignment assignment2 =
52+
new MemberAssignment(
53+
ImmutableSet.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1)));
54+
MemberDescription member2 =
55+
new MemberDescription(null, Optional.empty(), null, null, assignment2);
56+
57+
List<MemberDescription> members = ImmutableList.of(member1, member2);
58+
59+
List<TopicPartition> leaderAssignments =
60+
ImmutableList.of(new TopicPartition("topic2", 1), new TopicPartition("topic1", 0));
61+
List<TopicPartition> nonLeaderAssignments =
62+
ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1));
63+
3664
CommitterImpl committer = new CommitterImpl();
65+
assertThat(committer.containsFirstPartition(members, leaderAssignments)).isTrue();
66+
assertThat(committer.containsFirstPartition(members, nonLeaderAssignments)).isFalse();
67+
}
3768

69+
@Test
70+
public void testHasLeaderPartition() throws NoSuchFieldException, IllegalAccessException {
3871
MemberAssignment assignment1 =
3972
new MemberAssignment(
4073
ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1)));
@@ -49,12 +82,36 @@ public void testIsLeader() {
4982

5083
List<MemberDescription> members = ImmutableList.of(member1, member2);
5184

52-
List<TopicPartition> assignments =
85+
List<TopicPartition> leaderAssignments =
5386
ImmutableList.of(new TopicPartition("topic2", 1), new TopicPartition("topic1", 0));
54-
assertThat(committer.containsFirstPartition(members, assignments)).isTrue();
55-
56-
assignments =
87+
List<TopicPartition> nonLeaderAssignments =
5788
ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1));
58-
assertThat(committer.containsFirstPartition(members, assignments)).isFalse();
89+
90+
CommitterImpl committer = new CommitterImpl();
91+
Field configField = CommitterImpl.class.getDeclaredField("config");
92+
Field clientFactoryField = CommitterImpl.class.getDeclaredField("clientFactory");
93+
configField.setAccessible(true);
94+
clientFactoryField.setAccessible(true);
95+
96+
IcebergSinkConfig config = mock(IcebergSinkConfig.class);
97+
when(config.connectGroupId()).thenReturn("test-group");
98+
configField.set(committer, config);
99+
100+
KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
101+
Admin admin = mock(Admin.class);
102+
when(clientFactory.createAdmin()).thenReturn(admin);
103+
clientFactoryField.set(committer, clientFactory);
104+
105+
try (MockedStatic<KafkaUtils> mockKafkaUtils = mockStatic(KafkaUtils.class)) {
106+
ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);
107+
mockKafkaUtils
108+
.when(() -> KafkaUtils.consumerGroupDescription(any(), any()))
109+
.thenReturn(consumerGroupDescription);
110+
111+
when(consumerGroupDescription.members()).thenReturn(members);
112+
113+
assertThat(committer.hasLeaderPartition(leaderAssignments)).isTrue();
114+
assertThat(committer.hasLeaderPartition(nonLeaderAssignments)).isFalse();
115+
}
59116
}
60117
}

0 commit comments

Comments
 (0)