diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java index 32abe805f1..8ac3929cb9 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedLeakDetector.java @@ -49,7 +49,7 @@ public static LeakDetector getLeakDetector() { private ReferenceCountedLeakDetector() { } - static synchronized void enable(boolean advanced) { + public static synchronized void enable(boolean advanced) { FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE); } diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java index b2c53182d3..1fc72c3445 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/ReferenceCountedObject.java @@ -182,13 +182,4 @@ static ReferenceCountedObject wrap(V value, Runnable retainMethod, Consum static ReferenceCountedObject wrap(V value, Runnable retainMethod, Runnable releaseMethod) { return wrap(value, retainMethod, ignored -> releaseMethod.run()); } - - static void enableLeakDetection() { - ReferenceCountedLeakDetector.enable(false); - } - - static void enableAdvancedLeakDetection() { - ReferenceCountedLeakDetector.enable(true); - } - } diff --git a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java index 190f758589..ea3962c088 100644 --- a/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java +++ b/ratis-examples/src/test/java/org/apache/ratis/TestMultiRaftGroup.java @@ -22,25 +22,20 @@ import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine; import org.apache.ratis.examples.arithmetic.TestArithmetic; import org.apache.ratis.protocol.RaftGroup; -import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.impl.GroupManagementBaseTest; import org.apache.ratis.server.impl.MiniRaftCluster; -import org.apache.ratis.util.Slf4jUtils; import org.apache.ratis.util.function.CheckedBiConsumer; +import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; -import org.slf4j.event.Level; import java.io.IOException; import java.util.Collection; import java.util.concurrent.atomic.AtomicInteger; +@Timeout(value = 300) public class TestMultiRaftGroup extends BaseTest { - static { - Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); - } - - public static Collection data() throws IOException { + public static Collection data() { return ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0); } diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java index 47f9e1d4b6..2abde84244 100644 --- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java +++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java @@ -32,7 +32,6 @@ import org.apache.ratis.server.impl.MiniRaftCluster; import org.apache.ratis.server.impl.RaftServerTestUtil; import org.apache.ratis.util.NetUtils; -import org.apache.ratis.util.ReferenceCountedObject; import org.junit.Assert; import java.util.Optional; @@ -50,10 +49,6 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra } }; - static { - ReferenceCountedObject.enableLeakDetection(); - } - public interface FactoryGet extends Factory.Get { @Override default Factory getFactory() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java index f75a80f825..8ec6c19db1 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java @@ -59,23 +59,15 @@ public Comparator getCallIdComparator() { /** Send an appendEntries RPC; retry indefinitely. */ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestFirstIndex) throws InterruptedException, InterruptedIOException, RaftLogIOException { - int retry = 0; - - ReferenceCountedObject request = nextAppendEntriesRequest( - CallId.getAndIncrement(), false); - while (isRunning()) { // keep retrying for IOException + for(int retry = 0; isRunning(); retry++) { + final ReferenceCountedObject request = nextAppendEntriesRequest( + CallId.getAndIncrement(), false); + if (request == null) { + LOG.trace("{} no entries to send now, wait ...", this); + return null; + } try { - if (request == null || request.get().getEntriesCount() == 0) { - if (request != null) { - request.release(); - } - request = nextAppendEntriesRequest(CallId.getAndIncrement(), false); - } - - if (request == null) { - LOG.trace("{} no entries to send now, wait ...", this); - return null; - } else if (!isRunning()) { + if (!isRunning()) { LOG.info("{} is stopped. Skip appendEntries.", this); return null; } @@ -84,17 +76,19 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF final AppendEntriesReplyProto reply = sendAppendEntries(proto); final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX; requestFirstIndex.set(first); - request.release(); return reply; } catch (InterruptedIOException | RaftLogIOException e) { throw e; } catch (IOException ioe) { // TODO should have more detailed retry policy here. - if (retry++ % 10 == 0) { // to reduce the number of messages + if (retry % 10 == 0) { // to reduce the number of messages LOG.warn("{}: Failed to appendEntries (retry={})", this, retry, ioe); } handleException(ioe); + } finally { + request.release(); } + if (isRunning()) { getServer().properties().rpcSleepTime().sleep(); } diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java index 2aac6c1b1f..f4b6dc452e 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/memory/MemoryRaftLog.java @@ -45,15 +45,10 @@ */ public class MemoryRaftLog extends RaftLogBase { static class EntryList { - private final List> entries = new ArrayList<>(); - - ReferenceCountedObject getRef(int i) { - return i >= 0 && i < entries.size() ? entries.get(i) : null; - } + private final List entries = new ArrayList<>(); LogEntryProto get(int i) { - final ReferenceCountedObject ref = getRef(i); - return ref != null ? ref.get() : null; + return i >= 0 && i < entries.size() ? entries.get(i) : null; } TermIndex getTermIndex(int i) { @@ -81,13 +76,10 @@ void purge(int index) { } void clear(int from, int to) { - List> subList = entries.subList(from, to); - subList.forEach(ReferenceCountedObject::release); - subList.clear(); + entries.subList(from, to).clear(); } - void add(ReferenceCountedObject entryRef) { - entryRef.retain(); + void add(LogEntryProto entryRef) { entries.add(entryRef); } } @@ -128,7 +120,8 @@ public LogEntryProto get(long index) throws RaftLogIOException { public ReferenceCountedObject retainLog(long index) { checkLogState(); try (AutoCloseableLock readLock = readLock()) { - ReferenceCountedObject ref = entries.getRef(Math.toIntExact(index)); + final LogEntryProto entry = entries.get(Math.toIntExact(index)); + final ReferenceCountedObject ref = ReferenceCountedObject.wrap(entry); ref.retain(); return ref; } @@ -205,7 +198,7 @@ protected CompletableFuture appendEntryImpl(ReferenceCountedObject> appendImpl(ReferenceCountedObject extends BaseTest implements MiniRaftCluster.Factory.Get { { - Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG); - RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG); - RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS)); } diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java index be8739ad8e..7339937014 100644 --- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java +++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java @@ -187,8 +187,11 @@ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, S log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) { ++idxExpected; } - } catch (IOException e) { - throw new RuntimeException(e); + } catch (Exception e) { + throw new IllegalStateException("Failed logEntriesContains: startIndex=" + startIndex + + ", endIndex=" + endIndex + + ", #expectedMessages=" + expectedMessages.length + + ", log=" + log, e); } ++idxEntries; } diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java index 9a54700ecd..ade6110edc 100644 --- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java +++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java @@ -86,6 +86,10 @@ public abstract class MiniRaftCluster implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class); + static { + ReferenceCountedLeakDetector.enable(false); + } + public static final String CLASS_NAME = JavaUtils.getClassSimpleName(MiniRaftCluster.class); public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class"; private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine();