Skip to content

Commit

Permalink
RATIS-2173. Fix zero-copy bugs for non-gRPC cases.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Oct 11, 2024
1 parent df1d38a commit d0f2579
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public static LeakDetector getLeakDetector() {
private ReferenceCountedLeakDetector() {
}

static synchronized void enable(boolean advanced) {
public static synchronized void enable(boolean advanced) {
FACTORY.set(advanced ? Mode.ADVANCED : Mode.SIMPLE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,4 @@ static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Consum
static <V> ReferenceCountedObject<V> wrap(V value, Runnable retainMethod, Runnable releaseMethod) {
return wrap(value, retainMethod, ignored -> releaseMethod.run());
}

static void enableLeakDetection() {
ReferenceCountedLeakDetector.enable(false);
}

static void enableAdvancedLeakDetection() {
ReferenceCountedLeakDetector.enable(true);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,20 @@
import org.apache.ratis.examples.arithmetic.ArithmeticStateMachine;
import org.apache.ratis.examples.arithmetic.TestArithmetic;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.GroupManagementBaseTest;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.function.CheckedBiConsumer;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicInteger;

@Timeout(value = 300)
public class TestMultiRaftGroup extends BaseTest {
static {
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
}

public static Collection<Object[]> data() throws IOException {
public static Collection<Object[]> data() {
return ParameterizedBaseTest.getMiniRaftClusters(ArithmeticStateMachine.class, 0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.util.NetUtils;
import org.apache.ratis.util.ReferenceCountedObject;
import org.junit.Assert;

import java.util.Optional;
Expand All @@ -50,10 +49,6 @@ public MiniRaftClusterWithGrpc newCluster(String[] ids, String[] listenerIds, Ra
}
};

static {
ReferenceCountedObject.enableLeakDetection();
}

public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
@Override
default Factory<MiniRaftClusterWithGrpc> getFactory() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,23 +59,15 @@ public Comparator<Long> getCallIdComparator() {
/** Send an appendEntries RPC; retry indefinitely. */
private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestFirstIndex)
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;

ReferenceCountedObject<AppendEntriesRequestProto> request = nextAppendEntriesRequest(
CallId.getAndIncrement(), false);
while (isRunning()) { // keep retrying for IOException
for(int retry = 0; isRunning(); retry++) {
final ReferenceCountedObject<AppendEntriesRequestProto> request = nextAppendEntriesRequest(
CallId.getAndIncrement(), false);
if (request == null) {
LOG.trace("{} no entries to send now, wait ...", this);
return null;
}
try {
if (request == null || request.get().getEntriesCount() == 0) {
if (request != null) {
request.release();
}
request = nextAppendEntriesRequest(CallId.getAndIncrement(), false);
}

if (request == null) {
LOG.trace("{} no entries to send now, wait ...", this);
return null;
} else if (!isRunning()) {
if (!isRunning()) {
LOG.info("{} is stopped. Skip appendEntries.", this);
return null;
}
Expand All @@ -84,17 +76,19 @@ private AppendEntriesReplyProto sendAppendEntriesWithRetries(AtomicLong requestF
final AppendEntriesReplyProto reply = sendAppendEntries(proto);
final long first = proto.getEntriesCount() > 0 ? proto.getEntries(0).getIndex() : RaftLog.INVALID_LOG_INDEX;
requestFirstIndex.set(first);
request.release();
return reply;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
} catch (IOException ioe) {
// TODO should have more detailed retry policy here.
if (retry++ % 10 == 0) { // to reduce the number of messages
if (retry % 10 == 0) { // to reduce the number of messages
LOG.warn("{}: Failed to appendEntries (retry={})", this, retry, ioe);
}
handleException(ioe);
} finally {
request.release();
}

if (isRunning()) {
getServer().properties().rpcSleepTime().sleep();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,10 @@
*/
public class MemoryRaftLog extends RaftLogBase {
static class EntryList {
private final List<ReferenceCountedObject<LogEntryProto>> entries = new ArrayList<>();

ReferenceCountedObject<LogEntryProto> getRef(int i) {
return i >= 0 && i < entries.size() ? entries.get(i) : null;
}
private final List<LogEntryProto> entries = new ArrayList<>();

LogEntryProto get(int i) {
final ReferenceCountedObject<LogEntryProto> ref = getRef(i);
return ref != null ? ref.get() : null;
return i >= 0 && i < entries.size() ? entries.get(i) : null;
}

TermIndex getTermIndex(int i) {
Expand Down Expand Up @@ -81,13 +76,10 @@ void purge(int index) {
}

void clear(int from, int to) {
List<ReferenceCountedObject<LogEntryProto>> subList = entries.subList(from, to);
subList.forEach(ReferenceCountedObject::release);
subList.clear();
entries.subList(from, to).clear();
}

void add(ReferenceCountedObject<LogEntryProto> entryRef) {
entryRef.retain();
void add(LogEntryProto entryRef) {
entries.add(entryRef);
}
}
Expand Down Expand Up @@ -128,7 +120,8 @@ public LogEntryProto get(long index) throws RaftLogIOException {
public ReferenceCountedObject<LogEntryProto> retainLog(long index) {
checkLogState();
try (AutoCloseableLock readLock = readLock()) {
ReferenceCountedObject<LogEntryProto> ref = entries.getRef(Math.toIntExact(index));
final LogEntryProto entry = entries.get(Math.toIntExact(index));
final ReferenceCountedObject<LogEntryProto> ref = ReferenceCountedObject.wrap(entry);
ref.retain();
return ref;
}
Expand Down Expand Up @@ -205,7 +198,7 @@ protected CompletableFuture<Long> appendEntryImpl(ReferenceCountedObject<LogEntr
LogEntryProto entry = entryRef.retain();
try (AutoCloseableLock writeLock = writeLock()) {
validateLogEntry(entry);
entries.add(entryRef);
entries.add(entry);
} finally {
entryRef.release();
}
Expand Down Expand Up @@ -253,7 +246,7 @@ public List<CompletableFuture<Long>> appendImpl(ReferenceCountedObject<List<LogE
}
for (int i = index; i < logEntryProtos.size(); i++) {
LogEntryProto logEntryProto = logEntryProtos.get(i);
entries.add(entriesRef.delegate(logEntryProto));
entries.add(LogProtoUtils.copy(logEntryProto));
futures.add(CompletableFuture.completedFuture(logEntryProto.getIndex()));
}
return futures;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,18 @@
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.metrics.ServerMetricsTestUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.ExitUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Slf4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.slf4j.Logger;
import org.slf4j.event.Level;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -75,9 +72,6 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
Slf4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
RaftServerTestUtil.setStateMachineUpdaterLogLevel(Level.DEBUG);

RaftServerConfigKeys.RetryCache.setExpiryTime(getProperties(), TimeDuration.valueOf(5, TimeUnit.SECONDS));
}

Expand Down
7 changes: 5 additions & 2 deletions ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,11 @@ static boolean logEntriesContains(RaftLog log, long startIndex, long endIndex, S
log.get(termIndices[idxEntries].getIndex()).getStateMachineLogEntry().getLogData().toByteArray())) {
++idxExpected;
}
} catch (IOException e) {
throw new RuntimeException(e);
} catch (Exception e) {
throw new IllegalStateException("Failed logEntriesContains: startIndex=" + startIndex
+ ", endIndex=" + endIndex
+ ", #expectedMessages=" + expectedMessages.length
+ ", log=" + log, e);
}
++idxEntries;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@
public abstract class MiniRaftCluster implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);

static {
ReferenceCountedLeakDetector.enable(false);
}

public static final String CLASS_NAME = JavaUtils.getClassSimpleName(MiniRaftCluster.class);
public static final String STATEMACHINE_CLASS_KEY = CLASS_NAME + ".statemachine.class";
private static final StateMachine.Registry STATEMACHINE_REGISTRY_DEFAULT = gid -> new BaseStateMachine();
Expand Down

0 comments on commit d0f2579

Please sign in to comment.