Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@
<!-- coordinator-common -->
<suppress checks="NPathComplexity"
files="CoordinatorRuntime.java"/>
<suppress checks="JavaNCSS"
files="CoordinatorRuntimeTest.java"/>

<!-- share coordinator -->
<suppress checks="NPathComplexity"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -835,7 +835,7 @@ private void flushCurrentBatch() {
*/
private void maybeFlushCurrentBatch(long currentTimeMs) {
if (currentBatch != null) {
if (currentBatch.builder.isTransactional() || (currentBatch.appendTimeMs - currentTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
if (currentBatch.builder.isTransactional() || (currentTimeMs - currentBatch.appendTimeMs) >= appendLingerMs || !currentBatch.builder.hasRoomFor(0)) {
flushCurrentBatch();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

@SuppressWarnings({"checkstyle:JavaNCSS", "checkstyle:ClassDataAbstractionCoupling"})
@SuppressWarnings("checkstyle:ClassDataAbstractionCoupling")
public class CoordinatorRuntimeTest {
private static final TopicPartition TP = new TopicPartition("__consumer_offsets", 0);
private static final Duration DEFAULT_WRITE_TIMEOUT = Duration.ofMillis(5);
Expand Down Expand Up @@ -5280,6 +5280,7 @@ public void testCompleteTransactionEventCompletesOnlyOnce() throws Exception {
assertTrue(write1.isCompletedExceptionally());
verify(runtimeMetrics, times(1)).recordEventPurgatoryTime(writeTimeout.toMillis() + 1L);
}

@Test
public void testCoordinatorExecutor() {
Duration writeTimeout = Duration.ofMillis(1000);
Expand Down Expand Up @@ -5367,6 +5368,77 @@ public void testCoordinatorExecutor() {
assertTrue(write1.isDone());
}

@Test
public void testLingerTimeComparisonInMaybeFlushCurrentBatch() throws Exception {
// Provides the runtime clock; we will advance it.
MockTimer clockTimer = new MockTimer();
// Used for scheduling timer tasks; we won't advance it to avoid a timer-triggered batch flush.
MockTimer schedulerTimer = new MockTimer();

MockPartitionWriter writer = new MockPartitionWriter();

CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(clockTimer.time())
.withTimer(schedulerTimer)
.withDefaultWriteTimeOut(Duration.ofMillis(20))
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(writer)
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withAppendLingerMs(10)
.withExecutorService(mock(ExecutorService.class))
.build();

// Schedule the loading.
runtime.scheduleLoadOperation(TP, 10);

// Verify the initial state.
CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(ACTIVE, ctx.state);
assertNull(ctx.currentBatch);

// Write #1.
CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("record1"), "response1")
);
assertFalse(write1.isDone());
assertNotNull(ctx.currentBatch);
assertEquals(0, writer.entries(TP).size());

// Verify that the linger timeout task is created; there will also be a default write timeout task.
assertEquals(2, schedulerTimer.size());

// Advance past the linger time.
clockTimer.advanceClock(11);

// Verify that the linger task is not cancelled.
assertFalse(schedulerTimer.taskQueue().peek().cancelled());

// Write #2.
CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20),
state -> new CoordinatorResult<>(List.of("record2"), "response2")
);

// The batch should have been flushed.
assertEquals(1, writer.entries(TP).size());

// Verify that the linger timeout task is cancelled.
assertTrue(schedulerTimer.taskQueue().peek().cancelled());

// Verify batch contains both two records
MemoryRecords batch = writer.entries(TP).get(0);
assertEquals(2, batch.firstBatch().countOrNull());

// Commit and verify that writes are completed.
writer.commit(TP);
assertTrue(write1.isDone());
assertTrue(write2.isDone());
}

private static <S extends CoordinatorShard<U>, U> ArgumentMatcher<CoordinatorPlayback<U>> coordinatorMatcher(
CoordinatorRuntime<S, U> runtime,
TopicPartition tp
Expand Down
Loading