Skip to content

Commit

Permalink
[LI-HOTFIX] Fix flush behavior
Browse files Browse the repository at this point in the history
TICKET = KAFKA-8021

LI_DESCRIPTION = kafkaProducer#flush() api is expected to evaluate all record futures previously obtained via kafkaProducer#send(). The record futures can either be successful or may throw exception depending on the produce response by the broker but nonetheless, they should all be evaluated. This behavior is broken when a produce batch is split into smaller batches(because of RecordTooLargeException) and rescheduled to be sent to the broker. That is, it can so happen that after successfully executing #flush() api, there may exist few record futures which are not evaluated. This change checks if any of the produce batch failed with RecordTooLargeException and in such a case, retries flush operation of all incomplete batches. Note that, Although this change does not break the #flush() api contract, it has a behavior change in that more batches may get flushed than before. Rejected alternative: The other alternative was to chain the produce batch future. This approach fails to evaluate record futures as soon as their corresponding produce batch futures are evaluated. It also introduces a erroneous scenario where if one of the split batches are evaluated and the other split batch failed with an exception, we end up failing all record futures including the ones which belonged to the successful split batch.

EXIT_CRITERIA = MANUAL ["When the hotfix is pushed to apache/kafka"]
  • Loading branch information
viswamy committed Jun 21, 2019
1 parent eae1c4c commit 0ba99b9
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
Expand Down Expand Up @@ -667,6 +668,13 @@ boolean flushInProgress() {
return flushesInProgress.get() > 0;
}

/**
* This method should be used only for testing.
*/
IncompleteBatches incompleteBatches() {
return incomplete;
}

/* Visible for testing */
Map<TopicPartition, Deque<ProducerBatch>> batches() {
return Collections.unmodifiableMap(batches);
Expand All @@ -691,17 +699,29 @@ private boolean appendsInProgress() {
*/
public void awaitFlushCompletion(long timeoutMs) throws InterruptedException {
try {
boolean retry;
Long expireMs = System.currentTimeMillis() + timeoutMs;
for (ProducerBatch batch : this.incomplete.copyAll()) {
Long currentMs = System.currentTimeMillis();
if (currentMs > expireMs) {
throw new TimeoutException("Failed to flush accumulated records within" + timeoutMs + "milliseconds.");
}
boolean completed = batch.produceFuture.await(Math.max(expireMs - currentMs, 0), TimeUnit.MILLISECONDS);
if (!completed) {
throw new TimeoutException("Failed to flush accumulated records within" + timeoutMs + "milliseconds.");
do {
retry = false;
for (ProducerBatch batch : this.incomplete.copyAll()) {
Long currentMs = System.currentTimeMillis();
if (currentMs > expireMs) {
throw new TimeoutException("Failed to flush accumulated records within" + timeoutMs + "milliseconds.");
}
boolean completed = batch.produceFuture.await(Math.max(expireMs - currentMs, 0), TimeUnit.MILLISECONDS);
if (!completed) {
throw new TimeoutException("Failed to flush accumulated records within" + timeoutMs + "milliseconds.");
}
// If the produceFuture failed with RecordBatchTooLargeException, it means that the
// batch was split into smaller batches and re-enqueued into the RecordAccumulator by Sender thread.
// This if condition will make sure to retry and send all the split batches.
// Note that, More records get sent to the broker than necessary because the retry mechanism
// will also include all the newly added records via kafkaProducer.send() api.
if (batch.produceFuture.error() instanceof RecordBatchTooLargeException) {
retry = true;
}
}
}
} while (retry);
} finally {
this.flushesInProgress.decrementAndGet();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.clients.producer.internals;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.producer.Callback;
Expand Down Expand Up @@ -44,6 +46,7 @@
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;

import java.nio.ByteBuffer;
Expand Down Expand Up @@ -417,6 +420,54 @@ public void run() {
t.start();
}

@Test
public void testSplitAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(1024, 10 * 1024, CompressionType.GZIP, 10);

// Create a big batch
byte[] value = new byte[256];
// Create a batch such that it fails with RecordBatchTooLargeException
accum.append(new TopicPartition(topic, 0), 0L, null, value, null, null, maxBlockTimeMs);
accum.append(new TopicPartition(topic, 0), 0L, null, value, null, null, maxBlockTimeMs);

CountDownLatch flushInProgress = new CountDownLatch(1);
Iterator<ProducerBatch> incompleteBatches = accum.incompleteBatches().copyAll().iterator();

// Assert that there is only one batch
Assert.assertTrue(incompleteBatches.hasNext());
ProducerBatch producerBatch = incompleteBatches.next();
Assert.assertFalse(incompleteBatches.hasNext());

AtomicBoolean timedOut = new AtomicBoolean(false);
Thread thread = new Thread(() -> {
Assert.assertTrue(accum.hasIncomplete());
accum.beginFlush();
Assert.assertTrue(accum.flushInProgress());
try {
flushInProgress.countDown();
accum.awaitFlushCompletion(2000);
} catch (TimeoutException timeoutException) {
// Catch it and set the timedout variable
// This is the only valid path for this thread.
timedOut.set(true);
} catch (InterruptedException e) {
}
});
thread.start();
flushInProgress.await();
// Wait for 100ms to make sure that the flush is actually in progress
Thread.sleep(100);

// Split the big batch and re-enqueue
accum.splitAndReenqueue(producerBatch);
accum.deallocate(producerBatch);

thread.join();
// The thread would have failed with timeout exception because the child batches
// are not evaluated and it would have waited for 2seconds before the timeout.
Assert.assertTrue("The thread should have timed out", timedOut.get());
}

@Test
public void testAwaitFlushComplete() throws Exception {
RecordAccumulator accum = createTestRecordAccumulator(
Expand Down

0 comments on commit 0ba99b9

Please sign in to comment.