-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added timeouts on send and flush calls in KafkaProducerWrapper #696
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only a partial review, let's address these and I'll look deeper into the other parts of the newer code.
/** | ||
* There are two known cases that lead to IllegalStateException and we should retry: | ||
* (1) number of brokers is less than minISR | ||
* (2) producer is closed in generateSendFailure by another thread | ||
* (3) For either condition, we should retry as broker comes back healthy or producer is recreated | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Any reason why you moved these comments out? Also, I see that you've made the last line into (3), whereas it just talks about the above two points?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types.
Thread.sleep(_sendFailureRetryWaitTimeMs); | ||
} catch (TimeoutException e) { | ||
_log.warn("Kafka producer buffer is full, retry in {} ms.", _sendFailureRetryWaitTimeMs, e); | ||
} catch (TimeoutException ex) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was there some reason why you had to reorder the catches here? In the previous code, we catch IllegalStateException, and then catch TimeoutException, and then catch KafkaException. Let's not reorder these unless there is a good reason to. It also becomes harder to review since I can't easily see the actual changes vs. reordering.
Also you renamed all exception 'e' to 'ex', is that necessary?
} catch (IllegalStateException ex) { | ||
// The following exception should be quite rare as most exceptions will be throw async callback | ||
_log.warn("Either send is called on a closed producer or broker count is less than minISR, retry in {} ms.", | ||
_sendFailureRetryWaitTimeMs, ex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like the earlier IllegalStateException block would sleep and you've removed it. why?
Thread.sleep(_sendFailureRetryWaitTimeMs);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. This is definitely a bug I introduced. Will revert the entire exception handling piece in send()
. Better leave it as it is.
Throwable rootCause = ExceptionUtils.getRootCause(ex); | ||
if (numberOfAttempts > MAX_SEND_ATTEMPTS || | ||
(rootCause instanceof Error || rootCause instanceof RuntimeException)) { | ||
// Set a max_send_attempts for KafkaException as it may be non-recoverable |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment doesn't make sense here, move it back to outside the if condition?
onComplete.onCompletion(metadata, generateSendFailure(exception)); | ||
} | ||
})); | ||
numberOfAttempts++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
change this back to ++numberOfAttempts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I like the rename (added s
) but prefer the prefix form.
In fact, it wouldn't be such a bad idea to turn that loop into a for loop:
for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
...
}
or even eliminate retry
entirely:
for (int numberOfAttempts = 1;; ++numberOfAttempts) {
try {
maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
return;
} catch (...) {
...
}
}
} | ||
} | ||
} | ||
|
||
private synchronized void shutdownProducer() { | ||
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this intended to be package private? If so, add a comment, otherwise add the appropriate public/protected/private
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected
, it'll be more visible than it needs to be, private
wouldn't allow it to be overriden, and public
is too permissive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, makes sense. Just keep things consistent, i.e. if you need a more visible scope for multiple methods, declare them all as package private, and the rest as private. Don't mix protected and package private without having a very good reason to.
config/server.properties
Outdated
brooklin.server.kafkaProducerWrapper.sendTimeout=50000 | ||
brooklin.server.kafkaProducerWrapper.flushTimeout=3600000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This wouldn't be the right way to pass configs to your KafkaProducerWrapper. Lets discuss how to do this offline
private static final String SEND_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.sendTimeout"; | ||
private static final String FLUSH_TIMEOUT_CONFIG_KEY = "brooklin.server.kafkaProducerWrapper.flushTimeout"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't the right way to pass configs. Any configs you want to pass here need to be scoped under the transport provider configs. The KafkaProducerWrapper receives all of the transport provider configs, so you'd want to add these under that scope. We don't need to access the full property name, because as the configs pass through the layers, the relevant prefixes are removed. You can see how other configs are accessed in KafkaProducerWrapper, you'll see they don't have that whole "brooklin.server" prefix. You can access these the same way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@somandal I've discussed this with Ahmed. Will push the fixe soon
@@ -142,7 +143,7 @@ private void populateDefaultProducerConfigs() { | |||
DEFAULT_MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_VALUE); | |||
} | |||
|
|||
private Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) { | |||
protected Optional<Producer<K, V>> maybeGetKafkaProducer(DatastreamTask task) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see you overriding this in your Bounded implementation, why make this protected?
@@ -246,7 +253,7 @@ private synchronized void shutdownProducer() { | |||
} | |||
} | |||
|
|||
private DatastreamRuntimeException generateSendFailure(Exception exception) { | |||
protected DatastreamRuntimeException generateSendFailure(Exception exception) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see you overriding this in your Bounded implementation, why make this protected?
@@ -60,7 +61,8 @@ | |||
|
|||
private static final int TIME_OUT = 2000; | |||
private static final int MAX_SEND_ATTEMPTS = 10; | |||
private final Logger _log; | |||
|
|||
protected final Logger _log; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this with other protected member variables
@@ -72,8 +74,7 @@ | |||
// Producer is lazily initialized during the first send call. | |||
// Also, can be nullified in case of exceptions, and recreated by subsequent send calls. | |||
// Mark as volatile as it is mutable and used by different threads | |||
private volatile Producer<K, V> _kafkaProducer; | |||
|
|||
protected volatile Producer<K, V> _kafkaProducer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not necessary to mark member fields/methods protected
if the extenders of this class live in the same package. This is only useful if you want to make them accessible to extenders in different packages.
Since KafkaProducerWrapper
and BoundedKafkaProducerWrapper
both live in the same package (com.linkedin.datastream.kafka
), all package-private (no modifier) fields/methods in the former are accessible/overridable to/by the latter.
/** | ||
* There are two known cases that lead to IllegalStateException and we should retry: | ||
* (1) number of brokers is less than minISR | ||
* (2) producer is closed in generateSendFailure by another thread | ||
* (3) For either condition, we should retry as broker comes back healthy or producer is recreated | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
To Sonam's first point: I think those comments are closely related to the logic within the method since they discuss impl details like exception types.
onComplete.onCompletion(metadata, generateSendFailure(exception)); | ||
} | ||
})); | ||
numberOfAttempts++; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I like the rename (added s
) but prefer the prefix form.
In fact, it wouldn't be such a bad idea to turn that loop into a for loop:
for (int numberOfAttempts = 1; retry; ++numberOfAttempts) {
...
}
or even eliminate retry
entirely:
for (int numberOfAttempts = 1;; ++numberOfAttempts) {
try {
maybeGetKafkaProducer(task).ifPresent(p -> doSend(p, producerRecord, onComplete));
return;
} catch (...) {
...
}
}
@@ -17,6 +17,7 @@ | |||
import java.util.concurrent.atomic.AtomicInteger; | |||
import java.util.function.Supplier; | |||
|
|||
import org.apache.commons.lang.exception.ExceptionUtils; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing wrong with this but I'd recommend lang3
; it generally offers more modern facilities (better support for more recent versions of Java) and it's the one we have an explicit dependency on.
This would also entail adding an explicit dependency for this module on it in build.gradle
:
project(':datastream-kafka') {
dependencies {
...
compile "org.apache.commons:commons-lang3:$commonslang3Version"
} | ||
} | ||
} | ||
|
||
private synchronized void shutdownProducer() { | ||
void doSend(Producer<K, V> producer, ProducerRecord<K, V> record, Callback callback) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@somandal
I think that's okay; this method's intended to be overriden by another class in the same package. If it's marked protected
, it'll be more visible than it needs to be, private
wouldn't allow it to be overriden, and public
is too permissive.
private CompletableFuture<RecordMetadata> produceMessage(Producer<K, V> producer, ProducerRecord<K, V> record) { | ||
CompletableFuture<RecordMetadata> future = new CompletableFuture<>(); | ||
|
||
producer.send(record, (metadata, exception) -> { | ||
if (exception == null) { | ||
future.complete(metadata); | ||
} else { | ||
future.completeExceptionally(new KafkaClientException(metadata, exception)); | ||
} | ||
}); | ||
|
||
return future; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking out loud:
- This method is doing almost everything we need: it creates a
CompletableFuture
that is completed if the callback is called (send success/failure). - The only missing bit is canceling
future
after timeout elapses iffuture.isDone()
is false, which can be accomplished with aScheduledExecutorService
. I know this is exactly whatCompletableFutureUtils.failAfter()
is doing but I think the logic over there is more than what's absolutely necessary; we don't really need the otherCompletableFuture
failAfter()
creates or the additional logic inwithin()
. We can just cancel this samefuture
if it isn't done when timeout elapses.scheduler.schedule(() -> { if (!future.isDone()) { future.cancel(); } }, _sendTimeout, TimeUnit.MILLISECONDS);
future.cancel()
causes ajava.util.concurrent.CancellationException
to be thrown, which means we don't have to construct a TimeoutException ourselves because aCancellationException
can only mean we cancelled it after the timeout elapsed.
This seems like something this method can do with a private executor service. I am not sure we really need a utils class just for this purpose.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahmedahamid I just think that cancellation and timeout are semantically different. We may want to cancel the future after timeout in our case, but that's not necessarily true in general. Also, smth may be cancelled without waiting for timeout (based on user input or other external factors). Just thinking out loud. Will see whether I can get rid of the utils.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just think that cancellation and timeout are semantically different.
I didn't mean to suggest we should propagate CancellationException
to callback
. I was assuming doSend()
will still construct a TimeoutException
in case future
is cancelled. That does sound a bit roundabout though; it would certainly be better to do future.completeExceptionally(new TimeoutException(...))
instead of future.cancel()
if future.isDone()
is false after timeout.
if (exception == null) { | ||
future.complete(metadata); | ||
} else { | ||
future.completeExceptionally(new KafkaClientException(metadata, exception)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to propagate metadata
if exception
is null
. The javadocs on Callback.onCompletion()
state that metadata is null
if an error occurred. This would also spare us having to introduce KafkaClientException
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were tons of checks up the callback chain that are dealing with the metadata and exception. You're probably right, I need to see if it's safe to do so and remove KafkaClientException
. I was kind of forced to introduce it in the first place.
synchronized void flush() { | ||
if (_kafkaProducer != null) { | ||
try { | ||
CompletableFutureUtils.within(CompletableFuture.runAsync(() -> _kafkaProducer.flush()), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
CompletableFuture.runAsync()
uses the common pool which isn't the best option for a potentially long blocking call like Producer.flush()
. Even if we provide our own thread pool to runAsync()
, the CompletableFuture
we'll get won't give us a way to interrupt a Producer.flush()
call that exceeds the allowed timeout, which is necessary to free up the thread-pool thread in question. This is because calling cancel(true)
on a CompletableFuture
returned by runAsync()
only causes a cancellation exception to be propagated without interrupting the blocked thread pool.
I'm afraid our only option here seems to be using an ExecutorService
directly
// It's okay to use a single thread executor since flush() is synchronized
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<?> future = executor.submit(() -> super.flush());
try {
// Block until timeout elapses
future.get(_flushTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
...
// Interrupt the Producer.flush() call to free up the blocked thread
future.cancel(true);
...
}
private static final int DEFAULT_SEND_TIME_OUT = 5000; | ||
private static final int DEFAULT_FLUSH_TIME_OUT = 10 * 60 * 1000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Please encode time units into variable/config names, e.g.
DEFAULT_SEND_TIME_OUT_MS
- Use
long
s for timeout configs (e.g. see existing timeout configs) - If you like, you can use
Duration
methods to initialize (e.g.Duration.ofSeconds(5).toMillis()
)
private int _sendTimeout; | ||
private int _flushTimeout; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- final
- Add time unit suffixes (e.g.
_sendTimoutMs
)
/** | ||
* An extension of {@link KafkaProducerWrapper} with bounded calls for flush and send | ||
*/ | ||
class BoundedKafkaProducerWrapper<K, V> extends KafkaProducerWrapper<K, V> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Bounded
is a little vague cause it's easy to confuse with buffering. I realize a better name won't be easy. If you can't think of one, just make sure the Javadoc is unambiguous (e.g. with timeouts for flush and send
).
@jzakaryan Do we still need this PR? |
The changes in this PR address the issue of kafka client being stuck on
send
andflush
in cases when the destination topic gets dropped. Since kafka treats missing topic metadata as an eventual consistency issue and keeps retrying to send hoping that the topic will be available again, we have to make our calls to those methods bounded (i.e. have them time out after a certain amount of time).I use
CompletableFutures
to have timeouts on those calls which are non blocking. Seems like that's the best way to do that in java without having to resort to third party libraries.