Skip to content

Commit

Permalink
Kafka drain timeout (opensearch-project#3454)
Browse files Browse the repository at this point in the history
* Add getDrainTimeout method to buffer interface. Add as configurable value for kafka buffer

Signed-off-by: Chase Engelbrecht <[email protected]>

* Add unit tests

Signed-off-by: Chase Engelbrecht <[email protected]>

* Move getDrainTimeout to default method in the interface, add test for it, disable SNS sink

Signed-off-by: Chase Engelbrecht <[email protected]>

* Remove verification from non-mock

Signed-off-by: Chase Engelbrecht <[email protected]>

---------

Signed-off-by: Chase Engelbrecht <[email protected]>
  • Loading branch information
engechas authored Oct 9, 2023
1 parent b35ffc2 commit 37eea76
Show file tree
Hide file tree
Showing 12 changed files with 141 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.opensearch.dataprepper.model.CheckpointState;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -53,4 +54,8 @@ public interface Buffer<T extends Record<?>> {
void checkpoint(CheckpointState checkpointState);

boolean isEmpty();

default Duration getDrainTimeout() {
return Duration.ZERO;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.model.buffer;

import org.junit.Assert;
import org.junit.jupiter.api.Test;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;

import static org.mockito.Mockito.spy;

public class BufferTest {

@Test
public void testGetDrainTimeout() {
final Buffer<Record<Event>> buffer = spy(Buffer.class);

Assert.assertEquals(Duration.ZERO, buffer.getDrainTimeout());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.TimeoutException;
Expand Down Expand Up @@ -71,4 +72,9 @@ public void checkpoint(final CheckpointState checkpointState) {
public boolean isEmpty() {
return buffer.isEmpty();
}

@Override
public Duration getDrainTimeout() {
return buffer.getDrainTimeout();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,9 @@ public void execute() {
* 6. Stopping the sink ExecutorService
*/
public synchronized void shutdown() {
LOG.info("Pipeline [{}] - Received shutdown signal with processor shutdown timeout {} and sink shutdown timeout {}." +
" Initiating the shutdown process",
name, processorShutdownTimeout, sinkShutdownTimeout);
LOG.info("Pipeline [{}] - Received shutdown signal with buffer drain timeout {}, processor shutdown timeout {}, " +
"and sink shutdown timeout {}. Initiating the shutdown process",
name, buffer.getDrainTimeout(), processorShutdownTimeout, sinkShutdownTimeout);
try {
source.stop();
stopRequested.set(true);
Expand All @@ -277,7 +277,7 @@ public synchronized void shutdown() {
"proceeding with termination of process workers", name, ex);
}

shutdownExecutorService(processorExecutorService, processorShutdownTimeout.toMillis(), "processor");
shutdownExecutorService(processorExecutorService, buffer.getDrainTimeout().toMillis() + processorShutdownTimeout.toMillis(), "processor");

processorSets.forEach(processorSet -> processorSet.forEach(Processor::shutdown));
sinks.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;

/**
* Buffer decorator created for pipelines that make use of multiple buffers, such as PeerForwarder-enabled pipelines. The decorator
Expand Down Expand Up @@ -55,4 +57,11 @@ public boolean isEmpty() {
.map(Buffer::isEmpty)
.allMatch(result -> result == true);
}

@Override
public Duration getDrainTimeout() {
return Stream.concat(Stream.of(primaryBuffer), secondaryBuffers.stream())
.map(Buffer::getDrainTimeout)
.reduce(Duration.ZERO, Duration::plus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,12 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
Expand Down Expand Up @@ -64,6 +66,15 @@ void constructor_should_throw_with_null_circuitBreaker() {
assertThrows(NullPointerException.class, this::createObjectUnderTest);
}

@Test
void getDrainTimeout_returns_buffer_drain_timeout() {
final Duration duration = Duration.ofMillis(new Random().nextLong());
when(buffer.getDrainTimeout()).thenReturn(duration);

final Duration result = createObjectUnderTest().getDrainTimeout();
assertThat(result, equalTo(duration));
}

@Nested
class NoCircuitBreakerChecks {
@AfterEach
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.dataprepper.model.buffer.Buffer;
import org.opensearch.dataprepper.model.record.Record;

import java.time.Duration;
import java.util.AbstractMap;
import java.util.Collection;
import java.util.List;
Expand All @@ -34,7 +35,8 @@

@ExtendWith(MockitoExtension.class)
public class MultiBufferDecoratorTest {
private static final int TIMEOUT_MILLIS = new Random().nextInt(1000000) + 1;
private static final Random RANDOM = new Random();
private static final int TIMEOUT_MILLIS = RANDOM.nextInt(1000000) + 1;

@Mock
private Buffer primaryBuffer;
Expand Down Expand Up @@ -154,6 +156,51 @@ void isEmpty_MultipleSecondaryBuffers_OneNotEmpty() {
verify(secondaryBuffer, times(2)).isEmpty();
}

@Test
void getDrainTimeout_NoSecondaryBuffers_ReturnsPrimaryBufferValue() {
final Duration primaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong());
when(primaryBuffer.getDrainTimeout()).thenReturn(primaryBufferDrainTimeout);
final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(0);

final Duration result = multiBufferDecorator.getDrainTimeout();
assertThat(result, equalTo(primaryBufferDrainTimeout));

verify(primaryBuffer).getDrainTimeout();
}

@Test
void getDrainTimeout_OneSecondaryBuffer_ReturnsSumOfDurations() {
final Duration primaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong());
when(primaryBuffer.getDrainTimeout()).thenReturn(primaryBufferDrainTimeout);
final Duration secondaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong());
when(secondaryBuffer.getDrainTimeout()).thenReturn(secondaryBufferDrainTimeout);
final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(1);

final Duration result = multiBufferDecorator.getDrainTimeout();
assertThat(result, equalTo(primaryBufferDrainTimeout.plus(secondaryBufferDrainTimeout)));

verify(primaryBuffer).getDrainTimeout();
verify(secondaryBuffer).getDrainTimeout();
}

@Test
void getDrainTimeout_MultipleSecondaryBuffers_ReturnsSumOfDurations() {
final Duration primaryBufferDrainTimeout = Duration.ofMillis(RANDOM.nextLong());
when(primaryBuffer.getDrainTimeout()).thenReturn(primaryBufferDrainTimeout);
final Duration secondaryBufferDrainTimeout1 = Duration.ofMillis(RANDOM.nextLong());
final Duration secondaryBufferDrainTimeout2 = Duration.ofMillis(RANDOM.nextLong());
when(secondaryBuffer.getDrainTimeout())
.thenReturn(secondaryBufferDrainTimeout1)
.thenReturn(secondaryBufferDrainTimeout2);
final MultiBufferDecorator multiBufferDecorator = createObjectUnderTest(2);

final Duration result = multiBufferDecorator.getDrainTimeout();
assertThat(result, equalTo(primaryBufferDrainTimeout.plus(secondaryBufferDrainTimeout1).plus(secondaryBufferDrainTimeout2)));

verify(primaryBuffer).getDrainTimeout();
verify(secondaryBuffer, times(2)).getDrainTimeout();
}

private MultiBufferDecorator createObjectUnderTest(final int secondaryBufferCount) {
final List<Buffer> secondaryBuffers = IntStream.range(0, secondaryBufferCount)
.mapToObj(i -> secondaryBuffer)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,4 +91,5 @@ public boolean isEmpty() {
public int size() {
return buffer.size();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
Expand All @@ -36,6 +37,7 @@ public class KafkaBuffer<T extends Record<?>> extends AbstractBuffer<T> {
private final KafkaCustomProducer producer;
private final AbstractBuffer innerBuffer;
private final ExecutorService executorService;
private final Duration drainTimeout;

@DataPrepperPluginConstructor
public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig kafkaBufferConfig, final PluginFactory pluginFactory,
Expand All @@ -49,6 +51,8 @@ public KafkaBuffer(final PluginSetting pluginSetting, final KafkaBufferConfig ka
innerBuffer, pluginMetrics, acknowledgementSetManager, new AtomicBoolean(false));
this.executorService = Executors.newFixedThreadPool(consumers.size());
consumers.forEach(this.executorService::submit);

this.drainTimeout = kafkaBufferConfig.getDrainTimeout();
}

@Override
Expand Down Expand Up @@ -88,4 +92,9 @@ public boolean isEmpty() {
// TODO: check Kafka topic is empty as well.
return innerBuffer.isEmpty();
}

@Override
public Duration getDrainTimeout() {
return drainTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@
import org.opensearch.dataprepper.model.configuration.PluginModel;
import org.opensearch.dataprepper.model.configuration.PluginSetting;

import java.time.Duration;
import java.util.List;
import java.util.Objects;
import java.util.Optional;

public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConfig {
private static final Duration DEFAULT_DRAIN_TIMEOUT = Duration.ofSeconds(30);

@JsonProperty("bootstrap_servers")
private List<String> bootStrapServers;
Expand Down Expand Up @@ -39,6 +41,9 @@ public class KafkaBufferConfig implements KafkaProducerConfig, KafkaConsumerConf
@Valid
private AwsConfig awsConfig;

@JsonProperty("drain_timeout")
private Duration drainTimeout = DEFAULT_DRAIN_TIMEOUT;


public List<String> getBootstrapServers() {
if (Objects.nonNull(bootStrapServers)) {
Expand Down Expand Up @@ -113,4 +118,8 @@ public String getClientDnsLookup() {
public boolean getAcknowledgementsEnabled() {
return false;
}

public Duration getDrainTimeout() {
return drainTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,15 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.Objects;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeoutException;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -218,5 +221,15 @@ void test_kafkaBuffer_postProcess() {
}


@Test
void test_kafkaBuffer_getDrainTimeout() {
final Duration duration = Duration.ofMillis(new Random().nextLong());
when(bufferConfig.getDrainTimeout()).thenReturn(duration);
kafkaBuffer = createObjectUnderTest();

final Duration result = kafkaBuffer.getDrainTimeout();
assertThat(result, equalTo(duration));

verify(bufferConfig).getDrainTimeout();
}
}
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ include 'data-prepper-plugins:buffer-common'
include 'data-prepper-plugins:sqs-source'
include 'data-prepper-plugins:cloudwatch-logs'
include 'data-prepper-plugins:http-sink'
include 'data-prepper-plugins:sns-sink'
//include 'data-prepper-plugins:sns-sink'
include 'data-prepper-plugins:prometheus-sink'
include 'data-prepper-plugins:dissect-processor'
include 'data-prepper-plugins:dynamodb-source'

0 comments on commit 37eea76

Please sign in to comment.