diff --git a/config/server.properties b/config/server.properties index d99979995..6249c1d92 100644 --- a/config/server.properties +++ b/config/server.properties @@ -14,6 +14,8 @@ brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.li brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=localhost:9092 brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=localhost:2181 brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer +brooklin.server.transportProvider.kafkaTransportProvider.sendTimeout=50000 +brooklin.server.transportProvider.kafkaTransportProvider.flushTimeout=3600000 ########################### File connector Configs ###################### @@ -31,3 +33,4 @@ brooklin.server.connector.test.strategy.TasksPerDatastream = 4 brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory + diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java new file mode 100644 index 000000000..6aea54e76 --- /dev/null +++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java @@ -0,0 +1,103 @@ +/** + * Copyright 2020 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.kafka; + +import java.time.Duration; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.kafka.clients.producer.Callback; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.InterruptException; + +import com.linkedin.datastream.common.CompletableFutureUtils; +import com.linkedin.datastream.common.VerifiableProperties; + +/** + * An extension of {@link KafkaProducerWrapper} with configurable timeouts for flush and send calls + */ +class BoundedKafkaProducerWrapper extends KafkaProducerWrapper { + private static final long DEFAULT_SEND_TIME_OUT_MS = Duration.ofSeconds(5).toMillis(); + private static final long DEFAULT_FLUSH_TIME_OUT_MS = Duration.ofMinutes(10).toMillis(); + + private static final String SEND_TIMEOUT_CONFIG_KEY = "sendTimeout"; + private static final String FLUSH_TIMEOUT_CONFIG_KEY = "flushTimeout"; + + private final long _sendTimeoutMs; + private final long _flushTimeoutMs; + + BoundedKafkaProducerWrapper(String logSuffix, Properties props, String metricsNamesPrefix) { + super(logSuffix, props, metricsNamesPrefix); + + VerifiableProperties properties = new VerifiableProperties(props); + _sendTimeoutMs = properties.getLong(SEND_TIMEOUT_CONFIG_KEY, DEFAULT_SEND_TIME_OUT_MS); + _flushTimeoutMs = properties.getLong(FLUSH_TIMEOUT_CONFIG_KEY, DEFAULT_FLUSH_TIME_OUT_MS); + } + + @Override + void doSend(Producer producer, ProducerRecord record, Callback callback) { + CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeoutMs)) + .thenAccept(m -> callback.onCompletion(m, null)) + .exceptionally(completionEx -> { + Exception cause = (Exception) completionEx.getCause(); + if (cause instanceof java.util.concurrent.TimeoutException) { + _log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable."); + } + + callback.onCompletion(null, cause); + return null; + }); + } + + private CompletableFuture produceMessage(Producer producer, ProducerRecord record) { + CompletableFuture future = new CompletableFuture<>(); + + producer.send(record, (metadata, exception) -> { + if (exception == null) { + future.complete(metadata); + } else { + future.completeExceptionally(exception); + } + }); + + return future; + } + + @Override + synchronized void flush() { + ExecutorService executor = Executors.newSingleThreadExecutor(); + Future future = executor.submit(super::flush); + + try { + future.get(_flushTimeoutMs, TimeUnit.MILLISECONDS); + } catch (TimeoutException ex) { + _log.warn("Flush call timed out after {}ms. Cancelling flush", _flushTimeoutMs); + future.cancel(true); + } catch (ExecutionException ex) { + Throwable cause = ex.getCause(); + + if (cause instanceof InterruptException) { + throw (InterruptException) cause; + } else { + // This shouldn't happen + _log.warn("Flush failed.", cause); + } + } catch (InterruptedException ex) { + // This also shouldn't happen because kafka flush use their own InterruptException + Thread.currentThread().interrupt(); + throw new RuntimeException(ex); + } + } +} + diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java index 9abbc086e..1da38d0f0 100644 --- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java +++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java @@ -60,7 +60,7 @@ class KafkaProducerWrapper { private static final int TIME_OUT = 2000; private static final int MAX_SEND_ATTEMPTS = 10; - private final Logger _log; + private final long _sendFailureRetryWaitTimeMs; private final String _clientId; @@ -68,12 +68,6 @@ class KafkaProducerWrapper { // Set of datastream tasks assigned to the producer private final Set _tasks = ConcurrentHashMap.newKeySet(); - - // Producer is lazily initialized during the first send call. - // Also, can be nullified in case of exceptions, and recreated by subsequent send calls. - // Mark as volatile as it is mutable and used by different threads - private volatile Producer _kafkaProducer; - private final KafkaProducerFactory _producerFactory; // Limiter to control how fast producers are re-created after failures. @@ -98,6 +92,12 @@ class KafkaProducerWrapper { private final DynamicMetricsManager _dynamicMetricsManager; private final String _metricsNamesPrefix; + // Producer is lazily initialized during the first send call. + // Also, can be nullified in case of exceptions, and recreated by subsequent send calls. + // Mark as volatile as it is mutable and used by different threads + volatile Producer _kafkaProducer; + final Logger _log; + KafkaProducerWrapper(String logSuffix, Properties props) { this(logSuffix, props, null); } @@ -188,22 +188,21 @@ Producer createKafkaProducer() { void send(DatastreamTask task, ProducerRecord producerRecord, Callback onComplete) throws InterruptedException { - // There are two known cases that lead to IllegalStateException and we should retry: - // 1) number of brokers is less than minISR - // 2) producer is closed in generateSendFailure by another thread - // For either condition, we should retry as broker comes back healthy or producer is recreated boolean retry = true; - int numberOfAttempt = 0; + int numberOfAttempts = 0; + + /** + * There are two known cases that lead to IllegalStateException and we should retry: + * (1) number of brokers is less than minISR + * (2) producer is closed in generateSendFailure by another thread + * For either condition, we should retry as broker comes back healthy or producer is recreated + */ + while (retry) { try { - ++numberOfAttempt; - maybeGetKafkaProducer(task).ifPresent(p -> p.send(producerRecord, (metadata, exception) -> { - if (exception == null) { - onComplete.onCompletion(metadata, null); - } else { - onComplete.onCompletion(metadata, generateSendFailure(exception)); - } - })); + ++numberOfAttempts; + + maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete)); retry = false; } catch (IllegalStateException e) { @@ -220,12 +219,12 @@ void send(DatastreamTask task, ProducerRecord producerRecord, Callback onC cause = cause.getCause(); } // Set a max_send_attempts for KafkaException as it may be non-recoverable - if (numberOfAttempt > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) { + if (numberOfAttempts > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) { _log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), e); throw generateSendFailure(e); } else { _log.warn("Send failed for partition {} with retriable exception, retry {} out of {} in {} ms.", - producerRecord.partition(), numberOfAttempt, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e); + producerRecord.partition(), numberOfAttempts, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e); Thread.sleep(_sendFailureRetryWaitTimeMs); } } catch (Exception e) { @@ -235,7 +234,17 @@ void send(DatastreamTask task, ProducerRecord producerRecord, Callback onC } } - private synchronized void shutdownProducer() { + void doSend(Producer producer, ProducerRecord record, Callback callback) { + producer.send(record, (metadata, exception) -> { + if (exception == null) { + callback.onCompletion(metadata, null); + } else { + callback.onCompletion(metadata, generateSendFailure(exception)); + } + }); + } + + synchronized void shutdownProducer() { Producer producer = _kafkaProducer; // Nullify first to prevent subsequent send() to use // the current producer which is being shutdown. @@ -246,7 +255,7 @@ private synchronized void shutdownProducer() { } } - private DatastreamRuntimeException generateSendFailure(Exception exception) { + DatastreamRuntimeException generateSendFailure(Exception exception) { _dynamicMetricsManager.createOrUpdateMeter(_metricsNamesPrefix, AGGREGATE, PRODUCER_ERROR, 1); if (exception instanceof IllegalStateException) { _log.warn("sent failure transiently, exception: ", exception); diff --git a/datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java b/datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java new file mode 100644 index 000000000..f7d649b5c --- /dev/null +++ b/datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java @@ -0,0 +1,57 @@ +/** + * Copyright 2020 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.common; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import org.apache.commons.lang.NullArgumentException; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * Utilities for working with CompletableFutures + */ +public class CompletableFutureUtils { + private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build()); + + /** + * Returns a CompletableFuture which fails with a TimeoutException after the given interval + * @param duration Duration after which to fail + */ + public static CompletableFuture failAfter(Duration duration) { + final CompletableFuture promise = new CompletableFuture<>(); + SCHEDULER.schedule(() -> { + TimeoutException ex = new TimeoutException(String.format("Timeout after {}ms", duration)); + return promise.completeExceptionally(ex); + }, duration.toMillis(), TimeUnit.MILLISECONDS); + return promise; + } + + /** + * Returns a {@link CompletableFuture} which either successfully executes the given future, or fails with timeout + * after the given duration + * @param future Future to execute + * @param duration Timeout duration + * @throws NullArgumentException + */ + public static CompletableFuture within(CompletableFuture future, Duration duration) throws + NullArgumentException { + if (future == null) { + throw new NullArgumentException("future"); + } + + CompletableFuture timeout = failAfter(duration); + return future.applyToEither(timeout, Function.identity()); + } + +} \ No newline at end of file diff --git a/datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java b/datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java new file mode 100644 index 000000000..9dd1cfb21 --- /dev/null +++ b/datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java @@ -0,0 +1,72 @@ +/** + * Copyright 2020 LinkedIn Corporation. All rights reserved. + * Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information. + * See the NOTICE file in the project root for additional information regarding copyright ownership. + */ +package com.linkedin.datastream.common; + +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeoutException; + +import org.apache.commons.lang.NullArgumentException; +import org.testng.Assert; +import org.testng.annotations.Test; + + +/** + * Tests for {@link CompletableFutureUtils} + */ +public class TestCompletableFutureUtils { + + @Test + public void withinTimesOutAfterDuration() { + CompletableFuture future = CompletableFuture.supplyAsync(() -> { + try { + Thread.sleep(10000); + } catch (Exception ex) { + throw new RuntimeException(); + } + return null; + }); + + boolean exception = false; + try { + CompletableFutureUtils.within(future, Duration.ofMillis(10)).join(); + } catch (Exception ex) { + Throwable cause = ex.getCause(); + Assert.assertTrue(cause instanceof TimeoutException); + exception = true; + } + + Assert.assertTrue(exception); + } + + @Test + public void failAfterActuallyFailsAfterDuration() { + CompletableFuture future = CompletableFutureUtils.failAfter(Duration.ofMillis(10)); + + boolean exception = false; + try { + future.join(); + } catch (Exception ex) { + Throwable cause = ex.getCause(); + Assert.assertTrue(cause instanceof TimeoutException); + exception = true; + } + Assert.assertTrue(exception); + } + + @Test + public void withinThrowsNullArgumentExceptionIfNoFutureProvided() { + boolean exception = false; + + try { + CompletableFutureUtils.within(null, Duration.ofMillis(100)); + } catch (NullArgumentException ex) { + exception = true; + } + + Assert.assertTrue(exception); + } +} \ No newline at end of file