Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.ConsumerGroupState;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
Expand Down Expand Up @@ -74,18 +73,19 @@ public int compare(TopicPartition o1, TopicPartition o2) {
}
}

private boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
@VisibleForTesting
boolean hasLeaderPartition(Collection<TopicPartition> currentAssignedPartitions) {
ConsumerGroupDescription groupDesc;
try (Admin admin = clientFactory.createAdmin()) {
groupDesc = KafkaUtils.consumerGroupDescription(config.connectGroupId(), admin);
}
if (groupDesc.state() == ConsumerGroupState.STABLE) {
Collection<MemberDescription> members = groupDesc.members();
if (containsFirstPartition(members, currentAssignedPartitions)) {
membersWhenWorkerIsCoordinator = members;
return true;
}

Collection<MemberDescription> members = groupDesc.members();
if (containsFirstPartition(members, currentAssignedPartitions)) {
membersWhenWorkerIsCoordinator = members;
return true;
}

return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,22 +19,55 @@
package org.apache.iceberg.connect.channel;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import java.lang.reflect.Field;
import java.util.List;
import java.util.Optional;
import org.apache.iceberg.connect.IcebergSinkConfig;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.MemberAssignment;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

public class TestCommitterImpl {

@Test
public void testIsLeader() {
MemberAssignment assignment1 =
new MemberAssignment(
ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1)));
MemberDescription member1 =
new MemberDescription(null, Optional.empty(), null, null, assignment1);

MemberAssignment assignment2 =
new MemberAssignment(
ImmutableSet.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1)));
MemberDescription member2 =
new MemberDescription(null, Optional.empty(), null, null, assignment2);

List<MemberDescription> members = ImmutableList.of(member1, member2);

List<TopicPartition> leaderAssignments =
ImmutableList.of(new TopicPartition("topic2", 1), new TopicPartition("topic1", 0));
List<TopicPartition> nonLeaderAssignments =
ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1));

CommitterImpl committer = new CommitterImpl();
assertThat(committer.containsFirstPartition(members, leaderAssignments)).isTrue();
assertThat(committer.containsFirstPartition(members, nonLeaderAssignments)).isFalse();
}

@Test
public void testHasLeaderPartition() throws NoSuchFieldException, IllegalAccessException {
MemberAssignment assignment1 =
new MemberAssignment(
ImmutableSet.of(new TopicPartition("topic1", 0), new TopicPartition("topic2", 1)));
Expand All @@ -49,12 +82,36 @@ public void testIsLeader() {

List<MemberDescription> members = ImmutableList.of(member1, member2);

List<TopicPartition> assignments =
List<TopicPartition> leaderAssignments =
ImmutableList.of(new TopicPartition("topic2", 1), new TopicPartition("topic1", 0));
assertThat(committer.containsFirstPartition(members, assignments)).isTrue();

assignments =
List<TopicPartition> nonLeaderAssignments =
ImmutableList.of(new TopicPartition("topic2", 0), new TopicPartition("topic1", 1));
assertThat(committer.containsFirstPartition(members, assignments)).isFalse();

CommitterImpl committer = new CommitterImpl();
Field configField = CommitterImpl.class.getDeclaredField("config");
Field clientFactoryField = CommitterImpl.class.getDeclaredField("clientFactory");
configField.setAccessible(true);
clientFactoryField.setAccessible(true);

IcebergSinkConfig config = mock(IcebergSinkConfig.class);
when(config.connectGroupId()).thenReturn("test-group");
configField.set(committer, config);

KafkaClientFactory clientFactory = mock(KafkaClientFactory.class);
Admin admin = mock(Admin.class);
when(clientFactory.createAdmin()).thenReturn(admin);
clientFactoryField.set(committer, clientFactory);

try (MockedStatic<KafkaUtils> mockKafkaUtils = mockStatic(KafkaUtils.class)) {
ConsumerGroupDescription consumerGroupDescription = mock(ConsumerGroupDescription.class);
mockKafkaUtils
.when(() -> KafkaUtils.consumerGroupDescription(any(), any()))
.thenReturn(consumerGroupDescription);

when(consumerGroupDescription.members()).thenReturn(members);

assertThat(committer.hasLeaderPartition(leaderAssignments)).isTrue();
assertThat(committer.hasLeaderPartition(nonLeaderAssignments)).isFalse();
}
}
}