Skip to content

Commit

Permalink
Merge pull request #383 from TikhomirovSergey/kafka_refactor
Browse files Browse the repository at this point in the history
0.24.4-ALPHA: Fix of a kafka bug
  • Loading branch information
TikhomirovSergey authored Nov 24, 2022
2 parents 142884b + f157b92 commit 53140bb
Show file tree
Hide file tree
Showing 6 changed files with 32 additions and 27 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ plugins {
}

ext {
globalVersion = '0.24.3-ALPHA'
globalVersion = '0.24.4-ALPHA'
}

repositories {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import ru.tinkoff.qa.neptune.core.api.steps.annotations.StepParameter;
import ru.tinkoff.qa.neptune.core.api.steps.parameters.StepParameterPojo;
Expand All @@ -10,8 +11,9 @@
import java.util.*;
import java.util.function.Function;

import static java.time.Duration.ofNanos;
import static java.time.Duration.ofMillis;
import static java.util.Arrays.asList;
import static java.util.Optional.ofNullable;
import static java.util.stream.Collectors.toList;
import static java.util.stream.StreamSupport.stream;
import static ru.tinkoff.qa.neptune.kafka.properties.KafkaDefaultTopicsForPollProperty.DEFAULT_TOPICS_FOR_POLL;
Expand All @@ -27,30 +29,29 @@ public GetRecords(String[] topics) {
this.topics = topics.length == 0 ? DEFAULT_TOPICS_FOR_POLL.get() : topics;
}

private KafkaConsumer<String, String> kafkaConsumer;

@Override
public List<ConsumerRecord<String, String>> apply(KafkaStepContext context) {
var kafkaConsumer = context.createConsumer();

try (kafkaConsumer) {
kafkaConsumer.subscribe(asList(topics));
kafkaConsumer = ofNullable(kafkaConsumer).orElseGet(context::createConsumer);

ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(ofNanos(1));
Set<TopicPartition> partitions = consumerRecords.partitions();
kafkaConsumer.subscribe(asList(topics));
ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(ofMillis(100));
Set<TopicPartition> partitions = consumerRecords.partitions();

if (partitions.isEmpty()) {
return new ArrayList<>();
}
if (partitions.isEmpty()) {
return new ArrayList<>();
}

readRecords.addAll(stream(consumerRecords.spliterator(), false)
.map(KafkaRecordWrapper::new)
.collect(toList()));
readRecords.addAll(stream(consumerRecords.spliterator(), false)
.map(KafkaRecordWrapper::new)
.collect(toList()));

readRecords = readRecords.stream().distinct().collect(toList());
readRecords = readRecords.stream().distinct().collect(toList());

return readRecords.stream()
.map(KafkaRecordWrapper::getConsumerRecord)
.collect(toList());
}
return readRecords.stream()
.map(KafkaRecordWrapper::getConsumerRecord)
.collect(toList());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import org.testng.annotations.Test;
import ru.tinkoff.qa.neptune.kafka.DraftDto;

import java.time.Duration;
import java.util.Map;

import static java.time.Duration.ofNanos;
import static java.time.Duration.ofSeconds;
import static java.util.List.of;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.testng.Assert.fail;
import static ru.tinkoff.qa.neptune.core.api.hamcrest.iterables.MapEntryMatcher.mapEntry;
Expand All @@ -37,7 +38,7 @@ public void beforeClass() {
ConsumerRecord consumerRecord1 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"testName1\"}");
ConsumerRecord consumerRecord2 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"testName2\"}");

when(kafkaConsumer.poll(ofNanos(1)))
when(kafkaConsumer.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(Map.of(topicPartition, of(consumerRecord1, consumerRecord2))));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,15 @@
import org.testng.annotations.Test;
import ru.tinkoff.qa.neptune.kafka.DraftDto;

import java.time.Duration;
import java.util.Map;

import static java.time.Duration.ofNanos;
import static java.time.Duration.ofSeconds;
import static java.util.List.of;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.startsWith;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.testng.Assert.fail;
import static ru.tinkoff.qa.neptune.core.api.hamcrest.iterables.MapEntryMatcher.mapEntry;
Expand All @@ -37,7 +38,7 @@ public void beforeClass() {
ConsumerRecord consumerRecord1 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"testName1\"}");
ConsumerRecord consumerRecord2 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"testName2\"}");

when(kafkaConsumer.poll(ofNanos(1)))
when(kafkaConsumer.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(Map.of(topicPartition, of(consumerRecord1, consumerRecord2))));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
import org.testng.annotations.Test;
import ru.tinkoff.qa.neptune.kafka.DraftDto;

import java.time.Duration;
import java.util.Map;

import static java.time.Duration.ofNanos;
import static java.time.Duration.ofSeconds;
import static java.util.List.of;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
import static org.testng.Assert.fail;
import static ru.tinkoff.qa.neptune.core.api.hamcrest.iterables.MapEntryMatcher.mapEntry;
Expand All @@ -35,7 +36,7 @@ public void beforeClass() {
ConsumerRecord consumerRecord1 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"testName1\"}");
ConsumerRecord consumerRecord2 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"testName2\"}");

when(kafkaConsumer.poll(ofNanos(1)))
when(kafkaConsumer.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(Map.of(topicPartition, of(consumerRecord1, consumerRecord2))));

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@
import ru.tinkoff.qa.neptune.kafka.DraftDto;
import ru.tinkoff.qa.neptune.kafka.KafkaBasePreparations;

import java.time.Duration;
import java.util.Map;

import static java.time.Duration.ofNanos;
import static java.time.Duration.ofSeconds;
import static java.util.List.of;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.*;
import static ru.tinkoff.qa.neptune.core.api.hamcrest.iterables.SetOfObjectsEachItemMatcher.eachOfArray;
import static ru.tinkoff.qa.neptune.core.api.hamcrest.iterables.SetOfObjectsEachItemMatcher.eachOfIterable;
Expand Down Expand Up @@ -47,7 +48,7 @@ public void beforeClass() {
consumerRecord2 = new ConsumerRecord("testTopic", 1, 0, null, "{\"1\":1}");
consumerRecord3 = new ConsumerRecord("testTopic", 1, 0, null, "{\"name\":\"Condition\"}");

when(kafkaConsumer.poll(ofNanos(1)))
when(kafkaConsumer.poll(any(Duration.class)))
.thenReturn(new ConsumerRecords<>(Map.of(topicPartition, of(consumerRecord1, consumerRecord2, consumerRecord3))));

}
Expand Down

0 comments on commit 53140bb

Please sign in to comment.