Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added timeouts on send and flush calls in KafkaProducerWrapper #696

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions config/server.properties
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.li
brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=localhost:9092
brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=localhost:2181
brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer
brooklin.server.transportProvider.kafkaTransportProvider.sendTimeout=50000
brooklin.server.transportProvider.kafkaTransportProvider.flushTimeout=3600000

########################### File connector Configs ######################

Expand All @@ -31,3 +33,4 @@ brooklin.server.connector.test.strategy.TasksPerDatastream = 4

brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory

Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/**
* Copyright 2020 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.kafka;

import java.time.Duration;
import java.util.Properties;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InterruptException;

import com.linkedin.datastream.common.CompletableFutureUtils;
import com.linkedin.datastream.common.VerifiableProperties;

/**
* An extension of {@link KafkaProducerWrapper} with configurable timeouts for flush and send calls
*/
class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> {
private static final long DEFAULT_SEND_TIME_OUT_MS = Duration.ofSeconds(5).toMillis();
private static final long DEFAULT_FLUSH_TIME_OUT_MS = Duration.ofMinutes(10).toMillis();

private static final String SEND_TIMEOUT_CONFIG_KEY = "sendTimeout";
private static final String FLUSH_TIMEOUT_CONFIG_KEY = "flushTimeout";

private final long _sendTimeoutMs;
private final long _flushTimeoutMs;

BoundedKafkaProducerWrapper(String logSuffix, Properties props, String metricsNamesPrefix) {
super(logSuffix, props, metricsNamesPrefix);

VerifiableProperties properties = new VerifiableProperties(props);
_sendTimeoutMs = properties.getLong(SEND_TIMEOUT_CONFIG_KEY, DEFAULT_SEND_TIME_OUT_MS);
_flushTimeoutMs = properties.getLong(FLUSH_TIMEOUT_CONFIG_KEY, DEFAULT_FLUSH_TIME_OUT_MS);
}

@Override
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeoutMs))
.thenAccept(m -> callback.onCompletion(m, null))
.exceptionally(completionEx -> {
Exception cause = (Exception) completionEx.getCause();
if (cause instanceof java.util.concurrent.TimeoutException) {
_log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable.");
}

callback.onCompletion(null, cause);
return null;
});
}

private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) {
CompletableFuture<RecordMetadata> future = new CompletableFuture<>();

producer.send(record, (metadata, exception) -> {
if (exception == null) {
future.complete(metadata);
} else {
future.completeExceptionally(exception);
}
});

return future;
}

@Override
synchronized void flush() {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(super::flush);

try {
future.get(_flushTimeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException ex) {
_log.warn("Flush call timed out after {}ms. Cancelling flush", _flushTimeoutMs);
future.cancel(true);
} catch (ExecutionException ex) {
Throwable cause = ex.getCause();

if (cause instanceof InterruptException) {
throw (InterruptException) cause;
} else {
// This shouldn't happen
_log.warn("Flush failed.", cause);
}
} catch (InterruptedException ex) {
// This also shouldn't happen because kafka flush use their own InterruptException
Thread.currentThread().interrupt();
throw new RuntimeException(ex);
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -60,20 +60,14 @@ class KafkaProducerWrapper<K, V> {

private static final int TIME_OUT = 2000;
private static final int MAX_SEND_ATTEMPTS = 10;
private final Logger _log;

private final long _sendFailureRetryWaitTimeMs;

private final String _clientId;
private final Properties _props;

// Set of datastream tasks assigned to the producer
private final Set<DatastreamTask> _tasks = ConcurrentHashMap.newKeySet();

// Producer is lazily initialized during the first send call.
// Also, can be nullified in case of exceptions, and recreated by subsequent send calls.
// Mark as volatile as it is mutable and used by different threads
private volatile Producer<K, V> _kafkaProducer;

private final KafkaProducerFactory<K, V> _producerFactory;

// Limiter to control how fast producers are re-created after failures.
Expand All @@ -98,6 +92,12 @@ class KafkaProducerWrapper<K, V> {
private final DynamicMetricsManager _dynamicMetricsManager;
private final String _metricsNamesPrefix;

// Producer is lazily initialized during the first send call.
// Also, can be nullified in case of exceptions, and recreated by subsequent send calls.
// Mark as volatile as it is mutable and used by different threads
volatile Producer<K, V> _kafkaProducer;
final Logger _log;

KafkaProducerWrapper(String logSuffix, Properties props) {
this(logSuffix, props, null);
}
Expand Down Expand Up @@ -188,22 +188,21 @@ Producer<K, V> createKafkaProducer() {

void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onComplete)
throws InterruptedException {
// There are two known cases that lead to IllegalStateException and we should retry:
// 1) number of brokers is less than minISR
// 2) producer is closed in generateSendFailure by another thread
// For either condition, we should retry as broker comes back healthy or producer is recreated
boolean retry = true;
int numberOfAttempt = 0;
int numberOfAttempts = 0;

/**
* There are two known cases that lead to IllegalStateException and we should retry:
* (1) number of brokers is less than minISR
* (2) producer is closed in generateSendFailure by another thread
* For either condition, we should retry as broker comes back healthy or producer is recreated
*/

while (retry) {
try {
++numberOfAttempt;
maybeGetKafkaProducer(task).ifPresent(p -> p.send(producerRecord, (metadata, exception) -> {
if (exception == null) {
onComplete.onCompletion(metadata, null);
} else {
onComplete.onCompletion(metadata, generateSendFailure(exception));
}
}));
++numberOfAttempts;

maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));

retry = false;
} catch (IllegalStateException e) {
Expand All @@ -220,12 +219,12 @@ void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onC
cause = cause.getCause();
}
// Set a max_send_attempts for KafkaException as it may be non-recoverable
if (numberOfAttempt > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) {
if (numberOfAttempts > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) {
_log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), e);
throw generateSendFailure(e);
} else {
_log.warn("Send failed for partition {} with retriable exception, retry {} out of {} in {} ms.",
producerRecord.partition(), numberOfAttempt, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e);
producerRecord.partition(), numberOfAttempts, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e);
Thread.sleep(_sendFailureRetryWaitTimeMs);
}
} catch (Exception e) {
Expand All @@ -235,7 +234,17 @@ void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onC
}
}

private synchronized void shutdownProducer() {
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this intended to be package private? If so, add a comment, otherwise add the appropriate public/protected/private

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected, it'll be more visible than it needs to be, private wouldn't allow it to be overriden, and public is too permissive.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, makes sense. Just keep things consistent, i.e. if you need a more visible scope for multiple methods, declare them all as package private, and the rest as private. Don't mix protected and package private without having a very good reason to.

producer.send(record, (metadata, exception) -> {
if (exception == null) {
callback.onCompletion(metadata, null);
} else {
callback.onCompletion(metadata, generateSendFailure(exception));
}
});
}

synchronized void shutdownProducer() {
Producer<K, V> producer = _kafkaProducer;
// Nullify first to prevent subsequent send() to use
// the current producer which is being shutdown.
Expand All @@ -246,7 +255,7 @@ private synchronized void shutdownProducer() {
}
}

private DatastreamRuntimeException generateSendFailure(Exception exception) {
DatastreamRuntimeException generateSendFailure(Exception exception) {
_dynamicMetricsManager.createOrUpdateMeter(_metricsNamesPrefix, AGGREGATE, PRODUCER_ERROR, 1);
if (exception instanceof IllegalStateException) {
_log.warn("sent failure transiently, exception: ", exception);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright 2020 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.common;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import org.apache.commons.lang.NullArgumentException;

import com.google.common.util.concurrent.ThreadFactoryBuilder;

/**
* Utilities for working with CompletableFutures
*/
public class CompletableFutureUtils {
private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build());

/**
* Returns a CompletableFuture which fails with a TimeoutException after the given interval
* @param duration Duration after which to fail
*/
public static <T> CompletableFuture<T> failAfter(Duration duration) {
final CompletableFuture<T> promise = new CompletableFuture<>();
SCHEDULER.schedule(() -> {
TimeoutException ex = new TimeoutException(String.format("Timeout after {}ms", duration));
return promise.completeExceptionally(ex);
}, duration.toMillis(), TimeUnit.MILLISECONDS);
return promise;
}

/**
* Returns a {@link CompletableFuture} which either successfully executes the given future, or fails with timeout
* after the given duration
* @param future Future to execute
* @param duration Timeout duration
* @throws NullArgumentException
*/
public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) throws
NullArgumentException {
if (future == null) {
throw new NullArgumentException("future");
}

CompletableFuture<T> timeout = failAfter(duration);
return future.applyToEither(timeout, Function.identity());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/**
* Copyright 2020 LinkedIn Corporation. All rights reserved.
* Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
* See the NOTICE file in the project root for additional information regarding copyright ownership.
*/
package com.linkedin.datastream.common;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import org.apache.commons.lang.NullArgumentException;
import org.testng.Assert;
import org.testng.annotations.Test;


/**
* Tests for {@link CompletableFutureUtils}
*/
public class TestCompletableFutureUtils {

@Test
public void withinTimesOutAfterDuration() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(10000);
} catch (Exception ex) {
throw new RuntimeException();
}
return null;
});

boolean exception = false;
try {
CompletableFutureUtils.within(future, Duration.ofMillis(10)).join();
} catch (Exception ex) {
Throwable cause = ex.getCause();
Assert.assertTrue(cause instanceof TimeoutException);
exception = true;
}

Assert.assertTrue(exception);
}

@Test
public void failAfterActuallyFailsAfterDuration() {
CompletableFuture<String> future = CompletableFutureUtils.failAfter(Duration.ofMillis(10));

boolean exception = false;
try {
future.join();
} catch (Exception ex) {
Throwable cause = ex.getCause();
Assert.assertTrue(cause instanceof TimeoutException);
exception = true;
}
Assert.assertTrue(exception);
}

@Test
public void withinThrowsNullArgumentExceptionIfNoFutureProvided() {
boolean exception = false;

try {
CompletableFutureUtils.within(null, Duration.ofMillis(100));
} catch (NullArgumentException ex) {
exception = true;
}

Assert.assertTrue(exception);
}
}