diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java index cf7e3bf..7cab1ab 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/consumer/LiKafkaInstrumentedConsumerImpl.java @@ -707,9 +707,12 @@ public void close() { return; } try { - delegate.close(); + Consumer delegate = this.delegate; + if (delegate != null) { + delegate.close(); + } } finally { - delegate = null; + this.delegate = null; closeMdsClient(); } } @@ -723,9 +726,12 @@ public void close(long timeout, TimeUnit unit) { return; } try { - delegate.close(timeout, unit); + Consumer delegate = this.delegate; + if (delegate != null) { + delegate.close(timeout, unit); + } } finally { - delegate = null; + this.delegate = null; closeMdsClient(); } } @@ -738,9 +744,12 @@ public void close(Duration timeout) { return; } try { - delegate.close(timeout); + Consumer delegate = this.delegate; + if (delegate != null) { + delegate.close(timeout); + } } finally { - delegate = null; + this.delegate = null; closeMdsClient(); } } diff --git a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java index 2c4bf92..a24bd8b 100644 --- a/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java +++ b/li-apache-kafka-clients/src/main/java/com/linkedin/kafka/clients/producer/LiKafkaInstrumentedProducerImpl.java @@ -407,9 +407,12 @@ public void close() { return; } try { - delegate.close(); + Producer delegate = this.delegate; + if (delegate != null) { + delegate.close(); + } } finally { - delegate = null; + this.delegate = null; closeMdsClient(); } } @@ -426,10 +429,13 @@ public void close(Duration timeout) { return; } try { - //TODO - fix back after bumping up kafka - delegate.close(timeout.toMillis(), TimeUnit.MILLISECONDS); + Producer delegate = this.delegate; + if (delegate != null) { + //TODO - fix back after bumping up kafka + delegate.close(timeout.toMillis(), TimeUnit.MILLISECONDS); + } } finally { - delegate = null; + this.delegate = null; closeMdsClient(); } }