From 32745c3c4ecb9777a6d44e703134ef1181ad2dd2 Mon Sep 17 00:00:00 2001 From: Haibo Sun <7675577+sunhaibotb@users.noreply.github.com> Date: Tue, 2 Jul 2024 14:42:21 +0800 Subject: [PATCH] RATIS-2116. Fix the issue where RaftServerImpl.appendEntries may be blocked indefinitely (#1116) --- .../apache/ratis/util/DataBlockingQueue.java | 7 ++ .../java/org/apache/ratis/util/DataQueue.java | 5 ++ .../segmented/SegmentedRaftLogWorker.java | 6 +- .../segmented/TestSegmentedRaftLog.java | 84 +++++++++++++++++++ 4 files changed, 101 insertions(+), 1 deletion(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java index 842b8f1549..e905893e5b 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java @@ -162,4 +162,11 @@ public List pollList(long timeoutM return results; } } + + @Override + public E peek() { + try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + return super.peek(); + } + } } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java index 3db06f56e6..38762caa17 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/DataQueue.java @@ -154,6 +154,11 @@ public E poll() { return polled; } + /** Peek the head element from this queue. */ + public E peek() { + return q.peek(); + } + /** The same as {@link java.util.Collection#remove(Object)}. */ public boolean remove(E e) { final boolean removed = q.remove(e); diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index b9d1442a0d..dedba26905 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -63,6 +63,9 @@ class SegmentedRaftLogWorker { static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS); + private static final String CLASS_NAME = JavaUtils.getClassSimpleName(SegmentedRaftLogWorker.class); + static final String RUN_WORKER = CLASS_NAME + ".runWorker"; + static class StateMachineDataPolicy { private final boolean sync; private final TimeDuration syncTimeout; @@ -298,6 +301,7 @@ private void run() { // if and when a log task encounters an exception RaftLogIOException logIOException = null; + CodeInjectionForTesting.execute(RUN_WORKER, server == null ? null : server.getId(), null, queue); while (running) { try { Task task = queue.poll(ONE_SECOND); @@ -356,7 +360,7 @@ private boolean shouldFlush() { } else if (pendingFlushNum >= forceSyncNum) { return true; } - return pendingFlushNum > 0 && queue.isEmpty(); + return pendingFlushNum > 0 && !(queue.peek() instanceof WriteLog); } private void flushIfNecessary() throws IOException { diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java index 5779a9347f..52942279b0 100644 --- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java +++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java @@ -41,6 +41,8 @@ import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing; import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.util.CodeInjectionForTesting; +import org.apache.ratis.util.DataBlockingQueue; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.Slf4jUtils; @@ -57,8 +59,13 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.LongSupplier; import java.util.function.Supplier; import java.util.stream.Stream; @@ -74,6 +81,7 @@ import static java.lang.Boolean.FALSE; import static java.lang.Boolean.TRUE; +import static org.apache.ratis.server.raftlog.segmented.SegmentedRaftLogWorker.RUN_WORKER; import static org.apache.ratis.server.storage.RaftStorageTestUtils.getLogUnsafe; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.params.provider.Arguments.arguments; @@ -396,6 +404,82 @@ public void testAppendAndRoll(Boolean useAsyncFlush, Boolean smSyncFlush) throws } } + @ParameterizedTest + @MethodSource("data") + public void testPurgeAfterAppendEntry(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception { + RaftServerConfigKeys.Log.setAsyncFlushEnabled(properties, useAsyncFlush); + RaftServerConfigKeys.Log.StateMachineData.setSync(properties, smSyncFlush); + RaftServerConfigKeys.Log.setPurgeGap(properties, 1); + RaftServerConfigKeys.Log.setForceSyncNum(properties, 128); + + int startTerm = 0; + int endTerm = 2; + int segmentSize = 10; + long endIndexOfClosedSegment = segmentSize * (endTerm - startTerm - 1); + long nextStartIndex = segmentSize * (endTerm - startTerm); + + // append entries and roll logSegment for later purge operation + List ranges0 = prepareRanges(startTerm, endTerm, segmentSize, 0); + List entries0 = prepareLogEntries(ranges0, null); + try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) { + raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + entries0.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join); + } + + // test the pattern in the task queue of SegmentedRaftLogWorker: (WriteLog, ..., PurgeLog) + List ranges = prepareRanges(endTerm - 1, endTerm, 1, nextStartIndex); + List entries = prepareLogEntries(ranges, null); + + try (SegmentedRaftLog raftLog = newSegmentedRaftLog()) { + final CountDownLatch raftLogOpened = new CountDownLatch(1); + final CountDownLatch tasksAdded = new CountDownLatch(1); + + // inject test code to make the pattern (WriteLog, PurgeLog) + final ConcurrentLinkedQueue> appendFutures = new ConcurrentLinkedQueue<>(); + final AtomicReference> purgeFuture = new AtomicReference<>(); + final AtomicInteger tasksCount = new AtomicInteger(0); + CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> { + // wait for raftLog to be opened + try { + if(!raftLogOpened.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) { + throw new TimeoutException(); + } + } catch (InterruptedException | TimeoutException e) { + LOG.error("an exception occurred", e); + throw new RuntimeException(e); + } + + // add WriteLog and PurgeLog tasks + entries.stream().map(raftLog::appendEntry).forEach(appendFutures::add); + purgeFuture.set(raftLog.purge(endIndexOfClosedSegment)); + + tasksCount.set(((DataBlockingQueue) args[0]).getNumElements()); + tasksAdded.countDown(); + return true; + }); + + // open raftLog + raftLog.open(RaftLog.INVALID_LOG_INDEX, null); + raftLogOpened.countDown(); + + // wait for all tasks to be added + if(!tasksAdded.await(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit())) { + throw new TimeoutException(); + } + Assertions.assertEquals(entries.size() + 1, tasksCount.get()); + + // check if the purge task is executed + final Long purged = purgeFuture.get().get(); + LOG.info("purgeIndex = {}, purged = {}", endIndexOfClosedSegment, purged); + Assertions.assertEquals(endIndexOfClosedSegment, raftLog.getRaftLogCache().getStartIndex()); + + // check if the appendEntry futures are done + JavaUtils.allOf(appendFutures).get(FIVE_SECONDS.getDuration(), FIVE_SECONDS.getUnit()); + } finally { + CodeInjectionForTesting.put(RUN_WORKER, (localId, remoteId, args) -> false); + } + } + @ParameterizedTest @MethodSource("data") public void testTruncate(Boolean useAsyncFlush, Boolean smSyncFlush) throws Exception {