From 3fbbcc87b8d96c7977b194e714272d884c86356e Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 27 Oct 2023 15:47:46 -0500 Subject: [PATCH] Add unit tests for topic emptiness class Signed-off-by: Chase Engelbrecht --- .../kafka/consumer/KafkaCustomConsumer.java | 2 +- .../consumer/TopicEmptinessMetadata.java | 8 +- .../consumer/TopicEmptinessMetadataTest.java | 101 ++++++++++++++++++ 3 files changed, 108 insertions(+), 3 deletions(-) create mode 100644 data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadataTest.java diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java index 80e3f00169..27f84b5baa 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/KafkaCustomConsumer.java @@ -541,7 +541,7 @@ public synchronized boolean isTopicEmpty() { } if (currentThreadId != topicEmptinessMetadata.getTopicEmptyCheckingOwnerThreadId() || - topicEmptinessMetadata.isCheckDurationExceeded(System.currentTimeMillis())) { + topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis())) { return topicEmptinessMetadata.isTopicEmpty(); } diff --git a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadata.java b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadata.java index 99925862ee..bec9a17c91 100644 --- a/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadata.java +++ b/data-prepper-plugins/kafka-plugins/src/main/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadata.java @@ -9,7 +9,7 @@ import java.util.concurrent.ConcurrentHashMap; public class TopicEmptinessMetadata { - private static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L; + static final long IS_EMPTY_CHECK_INTERVAL_MS = 60000L; private long lastIsEmptyCheckTime; private Long topicEmptyCheckingOwnerThreadId; @@ -41,11 +41,15 @@ public Long getTopicEmptyCheckingOwnerThreadId() { return this.topicEmptyCheckingOwnerThreadId; } + public ConcurrentHashMap getTopicPartitionToIsEmpty() { + return this.topicPartitionToIsEmpty; + } + public boolean isTopicEmpty() { return topicPartitionToIsEmpty.values().stream().allMatch(isEmpty -> isEmpty); } - public boolean isCheckDurationExceeded(final long epochTimestamp) { + public boolean isWithinCheckInterval(final long epochTimestamp) { return epochTimestamp < lastIsEmptyCheckTime + IS_EMPTY_CHECK_INTERVAL_MS; } } diff --git a/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadataTest.java b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadataTest.java new file mode 100644 index 0000000000..25f1950936 --- /dev/null +++ b/data-prepper-plugins/kafka-plugins/src/test/java/org/opensearch/dataprepper/plugins/kafka/consumer/TopicEmptinessMetadataTest.java @@ -0,0 +1,101 @@ +package org.opensearch.dataprepper.plugins.kafka.consumer; + +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.opensearch.dataprepper.plugins.kafka.consumer.TopicEmptinessMetadata.IS_EMPTY_CHECK_INTERVAL_MS; + +public class TopicEmptinessMetadataTest { + @Mock + private TopicPartition topicPartition; + @Mock + private TopicPartition topicPartition2; + + private TopicEmptinessMetadata topicEmptinessMetadata; + + @BeforeEach + void setup() { + MockitoAnnotations.openMocks(this); + this.topicEmptinessMetadata = new TopicEmptinessMetadata(); + } + + @Test + void updateTopicEmptinessStatus_AddEntry() { + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false); + assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().containsKey(topicPartition), equalTo(true)); + assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().get(topicPartition), equalTo(false)); + } + + @Test + void updateTopicEmptinessStatus_UpdateEntry() { + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false); + assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().containsKey(topicPartition), equalTo(true)); + assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().get(topicPartition), equalTo(false)); + + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true); + assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().containsKey(topicPartition), equalTo(true)); + assertThat(topicEmptinessMetadata.getTopicPartitionToIsEmpty().get(topicPartition), equalTo(true)); + } + + @Test + void isTopicEmpty_NoItems() { + assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(true)); + } + + @Test + void isTopicEmpty_OnePartition_IsNotEmpty() { + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, false); + assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(false)); + } + + @Test + void isTopicEmpty_OnePartition_IsEmpty() { + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true); + assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(true)); + } + + @Test + void isTopicEmpty_MultiplePartitions_OneNotEmpty() { + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition, true); + topicEmptinessMetadata.updateTopicEmptinessStatus(topicPartition2, false); + assertThat(topicEmptinessMetadata.isTopicEmpty(), equalTo(false)); + } + + @Test + void isCheckDurationExceeded_NoPreviousChecks() { + assertThat(topicEmptinessMetadata.isWithinCheckInterval(System.currentTimeMillis()), equalTo(false)); + } + + @Test + void isCheckDurationExceeded_CurrentTimeBeforeLastCheck() { + final long time = System.currentTimeMillis(); + topicEmptinessMetadata.setLastIsEmptyCheckTime(time); + assertThat(topicEmptinessMetadata.isWithinCheckInterval(time - 1), equalTo(true)); + } + + @Test + void isCheckDurationExceeded_CurrentTimeAfterLastCheck_BeforeInterval() { + final long time = System.currentTimeMillis(); + topicEmptinessMetadata.setLastIsEmptyCheckTime(time); + assertThat(topicEmptinessMetadata.isWithinCheckInterval((time + IS_EMPTY_CHECK_INTERVAL_MS) - 1), equalTo(true)); + } + + @Test + void isCheckDurationExceeded_CurrentTimeAfterLastCheck_AtInterval() { + final long time = System.currentTimeMillis(); + topicEmptinessMetadata.setLastIsEmptyCheckTime(time); + assertThat(topicEmptinessMetadata.isWithinCheckInterval(time + IS_EMPTY_CHECK_INTERVAL_MS), equalTo(false)); + } + + @Test + void isCheckDurationExceeded_CurrentTimeAfterLastCheck_AfterInterval() { + final long time = System.currentTimeMillis(); + topicEmptinessMetadata.setLastIsEmptyCheckTime(time); + assertThat(topicEmptinessMetadata.isWithinCheckInterval(time + IS_EMPTY_CHECK_INTERVAL_MS + 1), equalTo(false)); + } +}