Skip to content

Commit

Permalink
RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Oct 7, 2024
1 parent da3f90b commit b84e277
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.ToLongFunction;

/**
Expand All @@ -46,6 +47,8 @@ public class DataBlockingQueue<E> extends DataQueue<E> {
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

private boolean closed = false;

public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction<E> getNumBytes) {
super(name, byteLimit, elementLimit, getNumBytes);
}
Expand All @@ -72,10 +75,34 @@ public void clear() {
}
}

/** Apply the given handler to each element and then {@link #clear()}. */
public void clear(Consumer<E> handler) {
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(E e : this) {
handler.accept(e);
}
super.clear();
}
}

/**
* Close this queue to stop accepting new elements, i.e. the offer(…) methods always return false.
* Note that closing the queue will not clear the existing elements.
* The existing elements can be peeked, polled or cleared after close.
*/
public void close() {
try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) {
closed = true;
}
}

@Override
public boolean offer(E element) {
Objects.requireNonNull(element, "element == null");
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
if (closed) {
return false;
}
if (super.offer(element)) {
notEmpty.signal();
return true;
Expand All @@ -95,6 +122,9 @@ public boolean offer(E element, TimeDuration timeout) throws InterruptedExceptio
long nanos = timeout.toLong(TimeUnit.NANOSECONDS);
try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) {
for(;;) {
if (closed) {
return false;
}
if (super.offer(element)) {
notEmpty.signal();
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.JavaUtils;
Expand Down Expand Up @@ -323,18 +322,19 @@ private class UnorderedRequestStreamObserver extends RequestStreamObserver {

@Override
void processClientRequest(ReferenceCountedObject<RaftClientRequest> requestRef) {
final RaftClientRequest request = requestRef.retain();
final long callId = request.getCallId();
final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry();
final CompletableFuture<Void> f = processClientRequest(requestRef, reply -> {
if (!reply.isSuccess()) {
LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply);
}
final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
responseNext(proto);
});

requestRef.release();
final long callId = requestRef.retain().getCallId();
final CompletableFuture<Void> f;
try {
f = processClientRequest(requestRef, reply -> {
if (!reply.isSuccess()) {
LOG.info("Failed request cid={}, reply={}", callId, reply);
}
final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
responseNext(proto);
});
} finally {
requestRef.release();
}

put(callId, f);
f.thenAccept(dummy -> remove(callId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ public Comparator<Long> getCallIdComparator() {
}

private void appendLog(boolean heartbeat) throws IOException {
ReferenceCountedObject<AppendEntriesRequestProto> pending = null;
final ReferenceCountedObject<AppendEntriesRequestProto> pending;
final AppendEntriesRequest request;
try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) {
// Prepare and send the append request.
Expand All @@ -388,18 +388,18 @@ private void appendLog(boolean heartbeat) throws IOException {
if (pending == null) {
return;
}
request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics);
pendingRequests.put(request);
increaseNextIndex(pending.get());
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin());
}
} catch(Exception e) {
if (pending != null) {
try {
request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics);
pendingRequests.put(request);
increaseNextIndex(pending.get());
if (appendLogRequestObserver == null) {
appendLogRequestObserver = new StreamObservers(
getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin());
}
} catch (Exception e) {
pending.release();
throw e;
}
throw e;
}

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.ReferenceCountedObject;
import org.apache.ratis.util.SizeInBytes;
Expand All @@ -38,10 +39,10 @@
import java.io.File;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -283,47 +284,103 @@ public ReferenceCountedObject<LogEntryProto> load(LogRecord key) throws IOExcept
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE);
if (ti.equals(key.getTermIndex())) {
entryRef.retain();
toReturn.set(entryRef);
} else {
entryRef.release();
}
entryRef.release();
});
loadingTimes.incrementAndGet();
return Objects.requireNonNull(toReturn.get());
}
}

static class EntryCache {
private final Map<TermIndex, ReferenceCountedObject<LogEntryProto>> map = new ConcurrentHashMap<>();
private static class Item {
private final AtomicReference<ReferenceCountedObject<LogEntryProto>> ref;
private final long serializedSize;

Item(ReferenceCountedObject<LogEntryProto> obj, long serializedSize) {
this.ref = new AtomicReference<>(obj);
this.serializedSize = serializedSize;
}

ReferenceCountedObject<LogEntryProto> get() {
return ref.get();
}

long release() {
final ReferenceCountedObject<LogEntryProto> entry = ref.getAndSet(null);
if (entry == null) {
return 0;
}
entry.release();
return serializedSize;
}
}

class EntryCache {
private Map<TermIndex, Item> map = new HashMap<>();
private final AtomicLong size = new AtomicLong();

@Override
public String toString() {
return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this;
}

long size() {
return size.get();
}

ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
return map.get(ti);
synchronized ReferenceCountedObject<LogEntryProto> get(TermIndex ti) {
if (map == null) {
return null;
}
final Item ref = map.get(ti);
return ref == null? null: ref.get();
}

void clear() {
map.values().forEach(ReferenceCountedObject::release);
map.clear();
size.set(0);
/** After close(), the cache CANNOT be used again. */
synchronized void close() {
if (map == null) {
return;
}
evict();
map = null;
LOG.info("Successfully closed {}", this);
}

void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
/** After evict(), the cache can be used again. */
synchronized void evict() {
if (map == null) {
return;
}
for (Iterator<Map.Entry<TermIndex, Item>> i = map.entrySet().iterator(); i.hasNext(); i.remove()) {
release(i.next().getValue());
}
}

synchronized void put(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
if (map == null) {
return;
}
valueRef.retain();
Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release);
size.getAndAdd(getEntrySize(valueRef.get(), op));
final long serializedSize = getEntrySize(valueRef.get(), op);
release(map.put(key, new Item(valueRef, serializedSize)));
size.getAndAdd(serializedSize);
}

private void release(ReferenceCountedObject<LogEntryProto> entry) {
size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE));
entry.release();
private void release(Item ref) {
if (ref == null) {
return;
}
final long serializedSize = ref.release();
size.getAndAdd(-serializedSize);
}

void remove(TermIndex key) {
Optional.ofNullable(map.remove(key)).ifPresent(this::release);
synchronized void remove(TermIndex key) {
if (map == null) {
return;
}
release(map.remove(key));
}
}

Expand Down Expand Up @@ -433,7 +490,13 @@ ReferenceCountedObject<LogEntryProto> getEntryFromCache(TermIndex ti) {
synchronized ReferenceCountedObject<LogEntryProto> loadCache(LogRecord record) throws RaftLogIOException {
ReferenceCountedObject<LogEntryProto> entry = entryCache.get(record.getTermIndex());
if (entry != null) {
return entry;
try {
entry.retain();
return entry;
} catch (IllegalStateException ignored) {
// The entry could be removed from the cache and released.
// The exception can be safely ignored since it is the same as cache miss.
}
}
try {
return cacheLoader.load(record);
Expand Down Expand Up @@ -505,7 +568,7 @@ private int compareTo(Long l) {

synchronized void clear() {
records.clear();
evictCache();
entryCache.close();
endIndex = startIndex - 1;
}

Expand All @@ -514,7 +577,7 @@ int getLoadingTimes() {
}

void evictCache() {
entryCache.clear();
entryCache.evict();
}

void putEntryCache(TermIndex key, ReferenceCountedObject<LogEntryProto> valueRef, Op op) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,9 +306,14 @@ public ReferenceCountedObject<LogEntryProto> retainLog(long index) throws RaftLo
}
final ReferenceCountedObject<LogEntryProto> entry = segment.getEntryFromCache(record.getTermIndex());
if (entry != null) {
getRaftLogMetrics().onRaftLogCacheHit();
entry.retain();
return entry;
try {
entry.retain();
getRaftLogMetrics().onRaftLogCacheHit();
return entry;
} catch (IllegalStateException ignored) {
// The entry could be removed from the cache and released.
// The exception can be safely ignored since it is the same as cache miss.
}
}

// the entry is not in the segment's cache. Load the cache without holding the lock.
Expand Down Expand Up @@ -346,6 +351,7 @@ public ReferenceCountedObject<EntryWithData> retainEntryWithData(long index) thr
} catch (Exception e) {
final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(entry);
LOG.error(err, e);
entryRef.release();
throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e));
}
}
Expand Down Expand Up @@ -558,13 +564,15 @@ public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
@Override
public void close() throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
LOG.info("Start closing {}", this);
super.close();
cacheEviction.close();
cache.close();
}
fileLogWorker.close();
storage.close();
getRaftLogMetrics().unregister();
LOG.info("Successfully closed {}", this);
}

SegmentedRaftLogCache getRaftLogCache() {
Expand Down
Loading

0 comments on commit b84e277

Please sign in to comment.