From fe19a5068219e51c1c03ff6f71420fe0dbe59597 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <george.zakaryan@hotmail.com>
Date: Mon, 23 Mar 2020 17:03:51 -0700
Subject: [PATCH 1/5] Added timeouts on send and flush calls in
 KafkaProducerWrapper

---
 .../kafka/KafkaClientException.java           |  23 ++++
 .../kafka/KafkaProducerWrapper.java           | 100 ++++++++++++------
 .../common/CompletableFutureUtils.java        |  57 ++++++++++
 .../common/TestCompletableFutureUtils.java    |  72 +++++++++++++
 4 files changed, 218 insertions(+), 34 deletions(-)
 create mode 100644 datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java
 create mode 100644 datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java
 create mode 100644 datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java

diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java
new file mode 100644
index 000000000..8c22c8ee7
--- /dev/null
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java
@@ -0,0 +1,23 @@
+/**
+ *  Copyright 2020 LinkedIn Corporation. All rights reserved.
+ *  Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
+ *  See the NOTICE file in the project root for additional information regarding copyright ownership.
+ */
+
+package com.linkedin.datastream.kafka;
+import org.apache.kafka.clients.producer.RecordMetadata;
+
+class KafkaClientException extends Exception {
+  private static final long serialVersionUID = 1;
+
+  private final RecordMetadata _metadata;
+
+  public RecordMetadata getMetadata() {
+    return _metadata;
+  }
+
+  public KafkaClientException(RecordMetadata metadata, Throwable innerException) {
+    super(innerException);
+    _metadata = metadata;
+  }
+}
\ No newline at end of file
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
index b48e1d208..ff513d09c 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
@@ -12,15 +12,18 @@
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
+import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -31,6 +34,7 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 
+import com.linkedin.datastream.common.CompletableFutureUtils;
 import com.linkedin.datastream.common.DatastreamRuntimeException;
 import com.linkedin.datastream.common.DatastreamTransientException;
 import com.linkedin.datastream.common.ReflectionUtils;
@@ -59,6 +63,9 @@ class KafkaProducerWrapper<K, V> {
 
   private static final int TIME_OUT = 2000;
   private static final int MAX_SEND_ATTEMPTS = 10;
+  private static final int SEND_TIME_OUT = 5000;
+  private static final int FLUSH_TIME_OUT = 10 * 60 * 1000;
+
   private final Logger _log;
   private final long _sendFailureRetryWaitTimeMs;
 
@@ -180,55 +187,79 @@ private synchronized Producer<K, V> initializeProducer(DatastreamTask task) {
     return _kafkaProducer;
   }
 
+  /**
+   * There are two known cases that lead to IllegalStateException and we should retry:
+   *   (1) number of brokers is less than minISR
+   *   (2) producer is closed in generateSendFailure by another thread
+   *   (3) For either condition, we should retry as broker comes back healthy or producer is recreated
+   */
   void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onComplete)
       throws InterruptedException {
-    // There are two known cases that lead to IllegalStateException and we should retry:
-    //  1) number of brokers is less than minISR
-    //  2) producer is closed in generateSendFailure by another thread
-    // For either condition, we should retry as broker comes back healthy or producer is recreated
     boolean retry = true;
-    int numberOfAttempt = 0;
+    int numberOfAttempts = 0;
+
     while (retry) {
       try {
-        ++numberOfAttempt;
-        maybeGetKafkaProducer(task).ifPresent(p -> p.send(producerRecord, (metadata, exception) -> {
-          if (exception == null) {
-            onComplete.onCompletion(metadata, null);
-          } else {
-            onComplete.onCompletion(metadata, generateSendFailure(exception));
-          }
-        }));
+        numberOfAttempts++;
+
+        maybeGetKafkaProducer(task).ifPresent(
+            p -> CompletableFutureUtils.within(produceMessage(p, producerRecord), Duration.ofMillis(SEND_TIME_OUT))
+                .thenAccept(m -> onComplete.onCompletion(m, null))
+                .exceptionally(completionEx -> {
+                  Throwable cause = completionEx.getCause();
+                  if (cause instanceof KafkaClientException) {
+                    KafkaClientException ex = (KafkaClientException) cause;
+                    onComplete.onCompletion(ex.getMetadata(), (Exception) ex.getCause());
+                  } else if (cause instanceof java.util.concurrent.TimeoutException) {
+                    _log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable.");
+                    onComplete.onCompletion(null, (java.util.concurrent.TimeoutException) cause);
+                  }
+                  return null;
+                }));
 
         retry = false;
-      } catch (IllegalStateException e) {
-        //The following exception should be quite rare as most exceptions will be throw async callback
-        _log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.",
-            _sendFailureRetryWaitTimeMs, e);
-        Thread.sleep(_sendFailureRetryWaitTimeMs);
-      } catch (TimeoutException e) {
-        _log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, e);
+      } catch (TimeoutException ex) {
+        _log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, ex);
         Thread.sleep(_sendFailureRetryWaitTimeMs);
-      } catch (KafkaException e) {
-        Throwable cause = e.getCause();
-        while (cause instanceof KafkaException) {
-          cause = cause.getCause();
-        }
-        // Set a max_send_attempts for KafkaException as it may be non-recoverable
-        if (numberOfAttempt > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) {
-          _log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), e);
-          throw generateSendFailure(e);
+      } catch (IllegalStateException ex) {
+        // The following exception should be quite rare as most exceptions will be throw async callback
+        _log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.",
+            _sendFailureRetryWaitTimeMs, ex);
+      } catch (KafkaException ex) {
+        Throwable rootCause = ExceptionUtils.getRootCause(ex);
+        if (numberOfAttempts > MAX_SEND_ATTEMPTS ||
+            (rootCause instanceof Error || rootCause instanceof RuntimeException)) {
+          // Set a max_send_attempts for KafkaException as it may be non-recoverable
+          _log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), ex);
+          throw generateSendFailure(ex);
         } else {
+          // The exception might be recoverable. Retry will be attempted
           _log.warn("Send failed for partition {} with retriable exception, retry {} out of {} in {} ms.",
-              producerRecord.partition(), numberOfAttempt, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e);
+              producerRecord.partition(), numberOfAttempts, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, ex);
           Thread.sleep(_sendFailureRetryWaitTimeMs);
         }
-      } catch (Exception e) {
-        _log.error("Send failed for partition {} with an exception", producerRecord.partition(), e);
-        throw generateSendFailure(e);
+      } catch (Exception ex) {
+        _log.error("Send failed for partition {} with an exception", producerRecord.partition(), ex);
+        throw generateSendFailure(ex);
       }
     }
   }
 
+  private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) {
+    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+
+    producer.send(record, (metadata, exception) -> {
+      if (exception == null) {
+        future.complete(metadata);
+      } else {
+        future.completeExceptionally(new KafkaClientException(metadata, exception));
+      }
+    });
+
+    return future;
+  }
+
+
   private synchronized void shutdownProducer() {
     Producer<K, V> producer = _kafkaProducer;
     // Nullify first to prevent subsequent send() to use
@@ -254,7 +285,8 @@ private DatastreamRuntimeException generateSendFailure(Exception exception) {
 
   synchronized void flush() {
     if (_kafkaProducer != null) {
-      _kafkaProducer.flush();
+      CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()),
+          Duration.ofMillis(FLUSH_TIME_OUT)).join();
     }
   }
 
diff --git a/datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java b/datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java
new file mode 100644
index 000000000..f7d649b5c
--- /dev/null
+++ b/datastream-utils/src/main/java/com/linkedin/datastream/common/CompletableFutureUtils.java
@@ -0,0 +1,57 @@
+/**
+ *  Copyright 2020 LinkedIn Corporation. All rights reserved.
+ *  Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
+ *  See the NOTICE file in the project root for additional information regarding copyright ownership.
+ */
+package com.linkedin.datastream.common;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.function.Function;
+
+import org.apache.commons.lang.NullArgumentException;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+/**
+ * Utilities for working with CompletableFutures
+ */
+public class CompletableFutureUtils {
+  private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("failAfter-%d").build());
+
+  /**
+   * Returns a CompletableFuture which fails with a TimeoutException after the given interval
+   * @param duration Duration after which to fail
+   */
+  public static <T> CompletableFuture<T> failAfter(Duration duration) {
+    final CompletableFuture<T> promise = new CompletableFuture<>();
+    SCHEDULER.schedule(() -> {
+      TimeoutException ex = new TimeoutException(String.format("Timeout after {}ms", duration));
+      return promise.completeExceptionally(ex);
+    }, duration.toMillis(), TimeUnit.MILLISECONDS);
+    return promise;
+  }
+
+  /**
+   * Returns a {@link CompletableFuture} which either successfully executes the given future, or fails with timeout
+   * after the given duration
+   * @param future Future to execute
+   * @param duration Timeout duration
+   * @throws NullArgumentException
+   */
+  public static <T> CompletableFuture<T> within(CompletableFuture<T> future, Duration duration) throws
+                                                                                                NullArgumentException {
+    if (future == null) {
+      throw new NullArgumentException("future");
+    }
+
+    CompletableFuture<T> timeout = failAfter(duration);
+    return future.applyToEither(timeout, Function.identity());
+  }
+
+}
\ No newline at end of file
diff --git a/datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java b/datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java
new file mode 100644
index 000000000..9dd1cfb21
--- /dev/null
+++ b/datastream-utils/src/test/java/com/linkedin/datastream/common/TestCompletableFutureUtils.java
@@ -0,0 +1,72 @@
+/**
+ *  Copyright 2020 LinkedIn Corporation. All rights reserved.
+ *  Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
+ *  See the NOTICE file in the project root for additional information regarding copyright ownership.
+ */
+package com.linkedin.datastream.common;
+
+import java.time.Duration;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.lang.NullArgumentException;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests for {@link CompletableFutureUtils}
+ */
+public class TestCompletableFutureUtils {
+
+  @Test
+  public void withinTimesOutAfterDuration() {
+    CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
+      try {
+        Thread.sleep(10000);
+      } catch (Exception ex) {
+        throw new RuntimeException();
+      }
+      return null;
+    });
+
+    boolean exception = false;
+    try {
+      CompletableFutureUtils.within(future, Duration.ofMillis(10)).join();
+    } catch (Exception ex) {
+      Throwable cause = ex.getCause();
+      Assert.assertTrue(cause instanceof TimeoutException);
+      exception = true;
+    }
+
+    Assert.assertTrue(exception);
+  }
+
+  @Test
+  public void failAfterActuallyFailsAfterDuration() {
+    CompletableFuture<String> future = CompletableFutureUtils.failAfter(Duration.ofMillis(10));
+
+    boolean exception = false;
+    try {
+      future.join();
+    } catch (Exception ex) {
+      Throwable cause = ex.getCause();
+      Assert.assertTrue(cause instanceof TimeoutException);
+      exception = true;
+    }
+    Assert.assertTrue(exception);
+  }
+
+  @Test
+  public void withinThrowsNullArgumentExceptionIfNoFutureProvided() {
+    boolean exception = false;
+
+    try {
+      CompletableFutureUtils.within(null, Duration.ofMillis(100));
+    } catch (NullArgumentException ex) {
+      exception = true;
+    }
+
+    Assert.assertTrue(exception);
+  }
+}
\ No newline at end of file

From 3134d6fbb01808cedd35d344c429dc31cc71e47a Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <george.zakaryan@hotmail.com>
Date: Tue, 24 Mar 2020 15:24:51 -0700
Subject: [PATCH 2/5] Fixed conflicts and error handling logic in flush after
 merge

---
 .../kafka/KafkaProducerWrapper.java           | 19 +++++++++++++------
 1 file changed, 13 insertions(+), 6 deletions(-)

diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
index 0578e7138..0da289c08 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
@@ -13,6 +13,7 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -192,7 +193,7 @@ private synchronized Producer<K, V> initializeProducer(DatastreamTask task) {
   Producer<K, V> createKafkaProducer() {
     return _producerFactory.createProducer(_props);
   }
-  
+
  /**
   * There are two known cases that lead to IllegalStateException and we should retry:
   *   (1) number of brokers is less than minISR
@@ -294,11 +295,17 @@ synchronized void flush() {
       try {
         CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()),
           Duration.ofMillis(FLUSH_TIME_OUT)).join();
-      } catch (InterruptException e) {
-        // The KafkaProducer object should not be reused on an interrupted flush
-        _log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer);
-        shutdownProducer();
-        throw e;
+      } catch (CompletionException e) {
+        Throwable cause = e.getCause();
+
+        if (cause instanceof InterruptException) {
+          // The KafkaProducer object should not be reused on an interrupted flush
+          _log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer);
+          shutdownProducer();
+          throw (InterruptException) cause;
+        } else if (cause instanceof java.util.concurrent.TimeoutException) {
+          _log.warn("Kafka producer flush timed out after {}ms. Destination topic may be unavailable.", FLUSH_TIME_OUT);
+        }
       }
     }
   }

From 988d8a56be0cdbe94fe44d25d95657c31e66be52 Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <george.zakaryan@hotmail.com>
Date: Mon, 30 Mar 2020 12:28:48 -0700
Subject: [PATCH 3/5] Made a separate abstraction for
 BoundedKafkaProducerWrapper. Added configurable timeouts

---
 config/server.properties                      |  4 +
 .../kafka/BoundedKafkaProducerWrapper.java    | 97 +++++++++++++++++++
 .../kafka/KafkaProducerWrapper.java           | 62 +++---------
 3 files changed, 116 insertions(+), 47 deletions(-)
 create mode 100644 datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java

diff --git a/config/server.properties b/config/server.properties
index d99979995..8c9f72a51 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -31,3 +31,7 @@ brooklin.server.connector.test.strategy.TasksPerDatastream = 4
 
 brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
 brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
+
+########################### Kafka Producer Wrapper Configs ####################
+brooklin.server.kafkaProducerWrapper.sendTimeout=50000
+brooklin.server.kafkaProducerWrapper.flushTimeout=3600000
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
new file mode 100644
index 000000000..4b34ba7c3
--- /dev/null
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
@@ -0,0 +1,97 @@
+/**
+ *  Copyright 2020 LinkedIn Corporation. All rights reserved.
+ *  Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
+ *  See the NOTICE file in the project root for additional information regarding copyright ownership.
+ */
+package com.linkedin.datastream.kafka;
+
+import java.time.Duration;
+import java.util.Properties;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.errors.InterruptException;
+
+import com.linkedin.datastream.common.CompletableFutureUtils;
+import com.linkedin.datastream.common.VerifiableProperties;
+
+/**
+ * An extension of {@link KafkaProducerWrapper} with bounded calls for flush and send
+ */
+class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> {
+  private static final int DEFAULT_SEND_TIME_OUT = 5000;
+  private static final int DEFAULT_FLUSH_TIME_OUT = 10 * 60 * 1000;
+
+  private static final String SEND_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.sendTimeout";
+  private static final String FLUSH_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.flushTimeout";
+
+  private int _sendTimeout;
+  private int _flushTimeout;
+
+  BoundedKafkaProducerWrapper(String logSuffix, Properties props, String metricsNamesPrefix) {
+    super(logSuffix, props, metricsNamesPrefix);
+
+    VerifiableProperties properties = new VerifiableProperties(props);
+    _sendTimeout = properties.getInt(SEND_TIMEOUT_CONFIG_KEY, DEFAULT_SEND_TIME_OUT);
+    _flushTimeout = properties.getInt(FLUSH_TIMEOUT_CONFIG_KEY, DEFAULT_FLUSH_TIME_OUT);
+  }
+
+  @Override
+  void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
+    CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeout))
+        .thenAccept(m -> callback.onCompletion(m, null))
+        .exceptionally(completionEx -> {
+          Throwable cause = completionEx.getCause();
+          if (cause instanceof KafkaClientException) {
+            KafkaClientException ex = (KafkaClientException) cause;
+            callback.onCompletion(ex.getMetadata(), (Exception) ex.getCause());
+          } else if (cause instanceof java.util.concurrent.TimeoutException) {
+            _log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable.");
+            callback.onCompletion(null, (java.util.concurrent.TimeoutException) cause);
+          }
+          return null;
+        });
+  }
+
+  private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) {
+    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
+
+    producer.send(record, (metadata, exception) -> {
+      if (exception == null) {
+        future.complete(metadata);
+      } else {
+        future.completeExceptionally(new KafkaClientException(metadata, exception));
+      }
+    });
+
+    return future;
+  }
+
+  @Override
+  synchronized void flush() {
+    if (_kafkaProducer != null) {
+      try {
+        CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()),
+            Duration.ofMillis(_flushTimeout)).join();
+      } catch (CompletionException e) {
+        Throwable cause = e.getCause();
+
+        if (cause instanceof InterruptException) {
+          // The KafkaProducer object should not be reused on an interrupted flush
+          _log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer);
+          shutdownProducer();
+          throw (InterruptException) cause;
+        } else if (cause instanceof java.util.concurrent.TimeoutException) {
+          _log.warn("Kafka producer flush timed out after {}ms. Destination topic may be unavailable.", _flushTimeout);
+        }
+
+        throw e;
+      }
+    }
+  }
+}
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
index 0da289c08..06a37fddc 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
@@ -12,8 +12,6 @@
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -24,7 +22,6 @@
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
@@ -36,7 +33,6 @@
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.RateLimiter;
 
-import com.linkedin.datastream.common.CompletableFutureUtils;
 import com.linkedin.datastream.common.DatastreamRuntimeException;
 import com.linkedin.datastream.common.DatastreamTransientException;
 import com.linkedin.datastream.common.ReflectionUtils;
@@ -65,10 +61,8 @@ class KafkaProducerWrapper<K, V> {
 
   private static final int TIME_OUT = 2000;
   private static final int MAX_SEND_ATTEMPTS = 10;
-  private static final int SEND_TIME_OUT = 5000;
-  private static final int FLUSH_TIME_OUT = 10 * 60 * 1000;
 
-  private final Logger _log;
+  protected final Logger _log;
   private final long _sendFailureRetryWaitTimeMs;
 
   private final String _clientId;
@@ -80,8 +74,7 @@ class KafkaProducerWrapper<K, V> {
   // Producer is lazily initialized during the first send call.
   // Also, can be nullified in case of exceptions, and recreated by subsequent send calls.
   // Mark as volatile as it is mutable and used by different threads
-  private volatile Producer<K, V> _kafkaProducer;
-
+  protected volatile Producer<K, V> _kafkaProducer;
   private final KafkaProducerFactory<K, V> _producerFactory;
 
   // Limiter to control how fast producers are re-created after failures.
@@ -150,7 +143,7 @@ private void populateDefaultProducerConfigs() {
         DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_VALUE);
   }
 
-  private Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) {
+  protected Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) {
     Producer<K, V> producer = _kafkaProducer;
     if (producer == null) {
       producer = initializeProducer(task);
@@ -209,20 +202,7 @@ void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onC
       try {
         numberOfAttempts++;
 
-        maybeGetKafkaProducer(task).ifPresent(
-            p -> CompletableFutureUtils.within(produceMessage(p, producerRecord), Duration.ofMillis(SEND_TIME_OUT))
-                .thenAccept(m -> onComplete.onCompletion(m, null))
-                .exceptionally(completionEx -> {
-                  Throwable cause = completionEx.getCause();
-                  if (cause instanceof KafkaClientException) {
-                    KafkaClientException ex = (KafkaClientException) cause;
-                    onComplete.onCompletion(ex.getMetadata(), (Exception) ex.getCause());
-                  } else if (cause instanceof java.util.concurrent.TimeoutException) {
-                    _log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable.");
-                    onComplete.onCompletion(null, (java.util.concurrent.TimeoutException) cause);
-                  }
-                  return null;
-                }));
+        maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
 
         retry = false;
       } catch (TimeoutException ex) {
@@ -252,22 +232,17 @@ void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onC
     }
   }
 
-  private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) {
-    CompletableFuture<RecordMetadata> future = new CompletableFuture<>();
-
+  void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
     producer.send(record, (metadata, exception) -> {
       if (exception == null) {
-        future.complete(metadata);
+        callback.onCompletion(metadata, null);
       } else {
-        future.completeExceptionally(new KafkaClientException(metadata, exception));
+        callback.onCompletion(metadata, generateSendFailure(exception));
       }
     });
-
-    return future;
   }
 
-
-  private synchronized void shutdownProducer() {
+  protected synchronized void shutdownProducer() {
     Producer<K, V> producer = _kafkaProducer;
     // Nullify first to prevent subsequent send() to use
     // the current producer which is being shutdown.
@@ -278,7 +253,7 @@ private synchronized void shutdownProducer() {
     }
   }
 
-  private DatastreamRuntimeException generateSendFailure(Exception exception) {
+  protected DatastreamRuntimeException generateSendFailure(Exception exception) {
     _dynamicMetricsManager.createOrUpdateMeter(_metricsNamesPrefix, AGGREGATE, PRODUCER_ERROR, 1);
     if (exception instanceof IllegalStateException) {
       _log.warn("sent failure transiently, exception: ", exception);
@@ -293,19 +268,12 @@ private DatastreamRuntimeException generateSendFailure(Exception exception) {
   synchronized void flush() {
     if (_kafkaProducer != null) {
       try {
-        CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()),
-          Duration.ofMillis(FLUSH_TIME_OUT)).join();
-      } catch (CompletionException e) {
-        Throwable cause = e.getCause();
-
-        if (cause instanceof InterruptException) {
-          // The KafkaProducer object should not be reused on an interrupted flush
-          _log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer);
-          shutdownProducer();
-          throw (InterruptException) cause;
-        } else if (cause instanceof java.util.concurrent.TimeoutException) {
-          _log.warn("Kafka producer flush timed out after {}ms. Destination topic may be unavailable.", FLUSH_TIME_OUT);
-        }
+        _kafkaProducer.flush();
+      } catch (InterruptException e) {
+        // The KafkaProducer object should not be reused on an interrupted flush
+        _log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer);
+        shutdownProducer();
+        throw e;
       }
     }
   }

From 1b8571e4bf1d0e741e36323249c5e38cb1a6dccb Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <george.zakaryan@hotmail.com>
Date: Tue, 7 Apr 2020 15:06:27 -0700
Subject: [PATCH 4/5] Fixed access modifier issues. Handled flush cancellation
 better

---
 .../kafka/BoundedKafkaProducerWrapper.java    | 66 +++++++++--------
 .../kafka/KafkaProducerWrapper.java           | 72 ++++++++++---------
 2 files changed, 74 insertions(+), 64 deletions(-)

diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
index 4b34ba7c3..9c6e1eb6d 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
@@ -7,9 +7,13 @@
 
 import java.time.Duration;
 import java.util.Properties;
-
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
@@ -21,29 +25,29 @@
 import com.linkedin.datastream.common.VerifiableProperties;
 
 /**
- * An extension of {@link KafkaProducerWrapper} with bounded calls for flush and send
+ * An extension of {@link KafkaProducerWrapper} with configurable timeouts for flush and send calls
  */
 class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> {
-  private static final int DEFAULT_SEND_TIME_OUT = 5000;
-  private static final int DEFAULT_FLUSH_TIME_OUT = 10 * 60 * 1000;
+  private static final long DEFAULT_SEND_TIME_OUT_MS = Duration.ofSeconds(5).toMillis();
+  private static final long DEFAULT_FLUSH_TIME_OUT_MS = Duration.ofMinutes(10).toMillis();
 
-  private static final String SEND_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.sendTimeout";
-  private static final String FLUSH_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.flushTimeout";
+  private static final String SEND_TIMEOUT_CONFIG_KEY = "sendTimeout";
+  private static final String FLUSH_TIMEOUT_CONFIG_KEY = "flushTimeout";
 
-  private int _sendTimeout;
-  private int _flushTimeout;
+  private long _sendTimeoutMs;
+  private long _flushTimeoutMs;
 
   BoundedKafkaProducerWrapper(String logSuffix, Properties props, String metricsNamesPrefix) {
     super(logSuffix, props, metricsNamesPrefix);
 
     VerifiableProperties properties = new VerifiableProperties(props);
-    _sendTimeout = properties.getInt(SEND_TIMEOUT_CONFIG_KEY, DEFAULT_SEND_TIME_OUT);
-    _flushTimeout = properties.getInt(FLUSH_TIMEOUT_CONFIG_KEY, DEFAULT_FLUSH_TIME_OUT);
+    _sendTimeoutMs = properties.getLong(SEND_TIMEOUT_CONFIG_KEY, DEFAULT_SEND_TIME_OUT_MS);
+    _flushTimeoutMs = properties.getLong(FLUSH_TIMEOUT_CONFIG_KEY, DEFAULT_FLUSH_TIME_OUT_MS);
   }
 
   @Override
   void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) {
-    CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeout))
+    CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeoutMs))
         .thenAccept(m -> callback.onCompletion(m, null))
         .exceptionally(completionEx -> {
           Throwable cause = completionEx.getCause();
@@ -74,24 +78,28 @@ private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer
 
   @Override
   synchronized void flush() {
-    if (_kafkaProducer != null) {
-      try {
-        CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()),
-            Duration.ofMillis(_flushTimeout)).join();
-      } catch (CompletionException e) {
-        Throwable cause = e.getCause();
-
-        if (cause instanceof InterruptException) {
-          // The KafkaProducer object should not be reused on an interrupted flush
-          _log.warn("Kafka producer flush interrupted, closing producer {}.", _kafkaProducer);
-          shutdownProducer();
-          throw (InterruptException) cause;
-        } else if (cause instanceof java.util.concurrent.TimeoutException) {
-          _log.warn("Kafka producer flush timed out after {}ms. Destination topic may be unavailable.", _flushTimeout);
-        }
-
-        throw e;
+    ExecutorService executor = Executors.newSingleThreadExecutor();
+    Future<?> future = executor.submit(super::flush);
+
+    try {
+      future.get(_flushTimeoutMs, TimeUnit.MILLISECONDS);
+    } catch (TimeoutException ex) {
+      _log.warn("Flush call timed out after {}ms. Cancelling flush", _flushTimeoutMs);
+      future.cancel(true);
+    } catch (ExecutionException ex) {
+      Throwable cause = ex.getCause();
+
+      if (cause instanceof InterruptException) {
+        throw (InterruptException) cause;
+      } else {
+        // This shouldn't happen
+        _log.warn("Flush failed.", cause);
       }
+    } catch (InterruptedException ex) {
+      // This also shouldn't happen because kafka flush use their own InterruptException
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(ex);
     }
   }
 }
+
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
index 06a37fddc..1da38d0f0 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaProducerWrapper.java
@@ -17,7 +17,6 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
-import org.apache.commons.lang.exception.ExceptionUtils;
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -62,7 +61,6 @@ class KafkaProducerWrapper<K, V> {
   private static final int TIME_OUT = 2000;
   private static final int MAX_SEND_ATTEMPTS = 10;
 
-  protected final Logger _log;
   private final long _sendFailureRetryWaitTimeMs;
 
   private final String _clientId;
@@ -70,11 +68,6 @@ class KafkaProducerWrapper<K, V> {
 
   // Set of datastream tasks assigned to the producer
   private final Set<DatastreamTask> _tasks = ConcurrentHashMap.newKeySet();
-
-  // Producer is lazily initialized during the first send call.
-  // Also, can be nullified in case of exceptions, and recreated by subsequent send calls.
-  // Mark as volatile as it is mutable and used by different threads
-  protected volatile Producer<K, V> _kafkaProducer;
   private final KafkaProducerFactory<K, V> _producerFactory;
 
   // Limiter to control how fast producers are re-created after failures.
@@ -99,6 +92,12 @@ class KafkaProducerWrapper<K, V> {
   private final DynamicMetricsManager _dynamicMetricsManager;
   private final String _metricsNamesPrefix;
 
+  // Producer is lazily initialized during the first send call.
+  // Also, can be nullified in case of exceptions, and recreated by subsequent send calls.
+  // Mark as volatile as it is mutable and used by different threads
+  volatile Producer<K, V> _kafkaProducer;
+  final Logger _log;
+
   KafkaProducerWrapper(String logSuffix, Properties props) {
     this(logSuffix, props, null);
   }
@@ -143,7 +142,7 @@ private void populateDefaultProducerConfigs() {
         DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_VALUE);
   }
 
-  protected Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) {
+  private Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) {
     Producer<K, V> producer = _kafkaProducer;
     if (producer == null) {
       producer = initializeProducer(task);
@@ -187,47 +186,50 @@ Producer<K, V> createKafkaProducer() {
     return _producerFactory.createProducer(_props);
   }
 
- /**
-  * There are two known cases that lead to IllegalStateException and we should retry:
-  *   (1) number of brokers is less than minISR
-  *   (2) producer is closed in generateSendFailure by another thread
-  *   (3) For either condition, we should retry as broker comes back healthy or producer is recreated
-  */
   void send(DatastreamTask task, ProducerRecord<K, V> producerRecord, Callback onComplete)
       throws InterruptedException {
     boolean retry = true;
     int numberOfAttempts = 0;
 
+    /**
+     * There are two known cases that lead to IllegalStateException and we should retry:
+     *   (1) number of brokers is less than minISR
+     *   (2) producer is closed in generateSendFailure by another thread
+     *   For either condition, we should retry as broker comes back healthy or producer is recreated
+     */
+
     while (retry) {
       try {
-        numberOfAttempts++;
+        ++numberOfAttempts;
 
         maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
 
         retry = false;
-      } catch (TimeoutException ex) {
-        _log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, ex);
-        Thread.sleep(_sendFailureRetryWaitTimeMs);
-      } catch (IllegalStateException ex) {
-        // The following exception should be quite rare as most exceptions will be throw async callback
+      } catch (IllegalStateException e) {
+        //The following exception should be quite rare as most exceptions will be throw async callback
         _log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.",
-            _sendFailureRetryWaitTimeMs, ex);
-      } catch (KafkaException ex) {
-        Throwable rootCause = ExceptionUtils.getRootCause(ex);
-        if (numberOfAttempts > MAX_SEND_ATTEMPTS ||
-            (rootCause instanceof Error || rootCause instanceof RuntimeException)) {
-          // Set a max_send_attempts for KafkaException as it may be non-recoverable
-          _log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), ex);
-          throw generateSendFailure(ex);
+            _sendFailureRetryWaitTimeMs, e);
+        Thread.sleep(_sendFailureRetryWaitTimeMs);
+      } catch (TimeoutException e) {
+        _log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, e);
+        Thread.sleep(_sendFailureRetryWaitTimeMs);
+      } catch (KafkaException e) {
+        Throwable cause = e.getCause();
+        while (cause instanceof KafkaException) {
+          cause = cause.getCause();
+        }
+        // Set a max_send_attempts for KafkaException as it may be non-recoverable
+        if (numberOfAttempts > MAX_SEND_ATTEMPTS || ((cause instanceof Error || cause instanceof RuntimeException))) {
+          _log.error("Send failed for partition {} with a non retriable exception", producerRecord.partition(), e);
+          throw generateSendFailure(e);
         } else {
-          // The exception might be recoverable. Retry will be attempted
           _log.warn("Send failed for partition {} with retriable exception, retry {} out of {} in {} ms.",
-              producerRecord.partition(), numberOfAttempts, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, ex);
+              producerRecord.partition(), numberOfAttempts, MAX_SEND_ATTEMPTS, _sendFailureRetryWaitTimeMs, e);
           Thread.sleep(_sendFailureRetryWaitTimeMs);
         }
-      } catch (Exception ex) {
-        _log.error("Send failed for partition {} with an exception", producerRecord.partition(), ex);
-        throw generateSendFailure(ex);
+      } catch (Exception e) {
+        _log.error("Send failed for partition {} with an exception", producerRecord.partition(), e);
+        throw generateSendFailure(e);
       }
     }
   }
@@ -242,7 +244,7 @@ void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callb
     });
   }
 
-  protected synchronized void shutdownProducer() {
+  synchronized void shutdownProducer() {
     Producer<K, V> producer = _kafkaProducer;
     // Nullify first to prevent subsequent send() to use
     // the current producer which is being shutdown.
@@ -253,7 +255,7 @@ protected synchronized void shutdownProducer() {
     }
   }
 
-  protected DatastreamRuntimeException generateSendFailure(Exception exception) {
+  DatastreamRuntimeException generateSendFailure(Exception exception) {
     _dynamicMetricsManager.createOrUpdateMeter(_metricsNamesPrefix, AGGREGATE, PRODUCER_ERROR, 1);
     if (exception instanceof IllegalStateException) {
       _log.warn("sent failure transiently, exception: ", exception);

From 9a11735524aebb65a31cfa753997a990c939c4ed Mon Sep 17 00:00:00 2001
From: Jhora Zakaryan <george.zakaryan@hotmail.com>
Date: Tue, 7 Apr 2020 17:35:17 -0700
Subject: [PATCH 5/5] Removed KafkaClientException class. Minor fixes in
 BoundedKafkaProducerWrapper

---
 config/server.properties                      |  5 ++--
 .../kafka/BoundedKafkaProducerWrapper.java    | 16 ++++++-------
 .../kafka/KafkaClientException.java           | 23 -------------------
 3 files changed, 9 insertions(+), 35 deletions(-)
 delete mode 100644 datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java

diff --git a/config/server.properties b/config/server.properties
index 8c9f72a51..6249c1d92 100644
--- a/config/server.properties
+++ b/config/server.properties
@@ -14,6 +14,8 @@ brooklin.server.transportProvider.kafkaTransportProvider.factoryClassName=com.li
 brooklin.server.transportProvider.kafkaTransportProvider.bootstrap.servers=localhost:9092
 brooklin.server.transportProvider.kafkaTransportProvider.zookeeper.connect=localhost:2181
 brooklin.server.transportProvider.kafkaTransportProvider.client.id=datastream-producer
+brooklin.server.transportProvider.kafkaTransportProvider.sendTimeout=50000
+brooklin.server.transportProvider.kafkaTransportProvider.flushTimeout=3600000
 
 ########################### File connector Configs ######################
 
@@ -32,6 +34,3 @@ brooklin.server.connector.test.strategy.TasksPerDatastream = 4
 brooklin.server.connector.kafkaMirroringConnector.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory
 brooklin.server.connector.kafkaMirroringConnector.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory
 
-########################### Kafka Producer Wrapper Configs ####################
-brooklin.server.kafkaProducerWrapper.sendTimeout=50000
-brooklin.server.kafkaProducerWrapper.flushTimeout=3600000
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
index 9c6e1eb6d..6aea54e76 100644
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
+++ b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/BoundedKafkaProducerWrapper.java
@@ -34,8 +34,8 @@ class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> {
   private static final String SEND_TIMEOUT_CONFIG_KEY = "sendTimeout";
   private static final String FLUSH_TIMEOUT_CONFIG_KEY = "flushTimeout";
 
-  private long _sendTimeoutMs;
-  private long _flushTimeoutMs;
+  private final long _sendTimeoutMs;
+  private final long _flushTimeoutMs;
 
   BoundedKafkaProducerWrapper(String logSuffix, Properties props, String metricsNamesPrefix) {
     super(logSuffix, props, metricsNamesPrefix);
@@ -50,14 +50,12 @@ void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callb
     CompletableFutureUtils.within(produceMessage(producer, record), Duration.ofMillis(_sendTimeoutMs))
         .thenAccept(m -> callback.onCompletion(m, null))
         .exceptionally(completionEx -> {
-          Throwable cause = completionEx.getCause();
-          if (cause instanceof KafkaClientException) {
-            KafkaClientException ex = (KafkaClientException) cause;
-            callback.onCompletion(ex.getMetadata(), (Exception) ex.getCause());
-          } else if (cause instanceof java.util.concurrent.TimeoutException) {
+          Exception cause = (Exception) completionEx.getCause();
+          if (cause instanceof java.util.concurrent.TimeoutException) {
             _log.warn("KafkaProducerWrapper send timed out. The destination topic may be unavailable.");
-            callback.onCompletion(null, (java.util.concurrent.TimeoutException) cause);
           }
+
+          callback.onCompletion(null, cause);
           return null;
         });
   }
@@ -69,7 +67,7 @@ private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer
       if (exception == null) {
         future.complete(metadata);
       } else {
-        future.completeExceptionally(new KafkaClientException(metadata, exception));
+        future.completeExceptionally(exception);
       }
     });
 
diff --git a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java b/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java
deleted file mode 100644
index 8c22c8ee7..000000000
--- a/datastream-kafka/src/main/java/com/linkedin/datastream/kafka/KafkaClientException.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- *  Copyright 2020 LinkedIn Corporation. All rights reserved.
- *  Licensed under the BSD 2-Clause License. See the LICENSE file in the project root for license information.
- *  See the NOTICE file in the project root for additional information regarding copyright ownership.
- */
-
-package com.linkedin.datastream.kafka;
-import org.apache.kafka.clients.producer.RecordMetadata;
-
-class KafkaClientException extends Exception {
-  private static final long serialVersionUID = 1;
-
-  private final RecordMetadata _metadata;
-
-  public RecordMetadata getMetadata() {
-    return _metadata;
-  }
-
-  public KafkaClientException(RecordMetadata metadata, Throwable innerException) {
-    super(innerException);
-    _metadata = metadata;
-  }
-}
\ No newline at end of file