From b84e277282972b223ac4fe0a7c806e4401df11d2 Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Mon, 7 Oct 2024 13:23:38 -0700 Subject: [PATCH] RATIS-2151. TestRaftWithGrpc keeps failing with zero-copy. --- .../apache/ratis/util/DataBlockingQueue.java | 30 +++++ .../server/GrpcClientProtocolService.java | 26 ++--- .../ratis/grpc/server/GrpcLogAppender.java | 22 ++-- .../server/raftlog/segmented/LogSegment.java | 109 ++++++++++++++---- .../raftlog/segmented/SegmentedRaftLog.java | 14 ++- .../segmented/SegmentedRaftLogWorker.java | 25 ++-- .../impl/SimpleStateMachine4Testing.java | 21 ++-- 7 files changed, 175 insertions(+), 72 deletions(-) diff --git a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java index e905893e5b..fb0f0715c5 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/DataBlockingQueue.java @@ -29,6 +29,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.ToLongFunction; /** @@ -46,6 +47,8 @@ public class DataBlockingQueue extends DataQueue { private final Condition notFull = lock.newCondition(); private final Condition notEmpty = lock.newCondition(); + private boolean closed = false; + public DataBlockingQueue(Object name, SizeInBytes byteLimit, int elementLimit, ToLongFunction getNumBytes) { super(name, byteLimit, elementLimit, getNumBytes); } @@ -72,10 +75,34 @@ public void clear() { } } + /** Apply the given handler to each element and then {@link #clear()}. */ + public void clear(Consumer handler) { + try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + for(E e : this) { + handler.accept(e); + } + super.clear(); + } + } + + /** + * Close this queue to stop accepting new elements, i.e. the offer(…) methods always return false. + * Note that closing the queue will not clear the existing elements. + * The existing elements can be peeked, polled or cleared after close. + */ + public void close() { + try(AutoCloseableLock ignored = AutoCloseableLock.acquire(lock)) { + closed = true; + } + } + @Override public boolean offer(E element) { Objects.requireNonNull(element, "element == null"); try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { + if (closed) { + return false; + } if (super.offer(element)) { notEmpty.signal(); return true; @@ -95,6 +122,9 @@ public boolean offer(E element, TimeDuration timeout) throws InterruptedExceptio long nanos = timeout.toLong(TimeUnit.NANOSECONDS); try(AutoCloseableLock auto = AutoCloseableLock.acquire(lock)) { for(;;) { + if (closed) { + return false; + } if (super.offer(element)) { notEmpty.signal(); return true; diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java index 80a9a439b9..b7548780cd 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcClientProtocolService.java @@ -29,7 +29,6 @@ import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver; import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto; import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto; -import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry; import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase; import org.apache.ratis.util.CollectionUtils; import org.apache.ratis.util.JavaUtils; @@ -323,18 +322,19 @@ private class UnorderedRequestStreamObserver extends RequestStreamObserver { @Override void processClientRequest(ReferenceCountedObject requestRef) { - final RaftClientRequest request = requestRef.retain(); - final long callId = request.getCallId(); - final SlidingWindowEntry slidingWindowEntry = request.getSlidingWindowEntry(); - final CompletableFuture f = processClientRequest(requestRef, reply -> { - if (!reply.isSuccess()) { - LOG.info("Failed request cid={}, {}, reply={}", callId, slidingWindowEntry, reply); - } - final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply); - responseNext(proto); - }); - - requestRef.release(); + final long callId = requestRef.retain().getCallId(); + final CompletableFuture f; + try { + f = processClientRequest(requestRef, reply -> { + if (!reply.isSuccess()) { + LOG.info("Failed request cid={}, reply={}", callId, reply); + } + final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply); + responseNext(proto); + }); + } finally { + requestRef.release(); + } put(callId, f); f.thenAccept(dummy -> remove(callId)); diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java index 7edb3ae0b7..18d4c62c61 100644 --- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java +++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java @@ -379,7 +379,7 @@ public Comparator getCallIdComparator() { } private void appendLog(boolean heartbeat) throws IOException { - ReferenceCountedObject pending = null; + final ReferenceCountedObject pending; final AppendEntriesRequest request; try (AutoCloseableLock writeLock = lock.writeLock(caller, LOG::trace)) { // Prepare and send the append request. @@ -388,18 +388,18 @@ private void appendLog(boolean heartbeat) throws IOException { if (pending == null) { return; } - request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics); - pendingRequests.put(request); - increaseNextIndex(pending.get()); - if (appendLogRequestObserver == null) { - appendLogRequestObserver = new StreamObservers( - getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); - } - } catch(Exception e) { - if (pending != null) { + try { + request = new AppendEntriesRequest(pending.get(), getFollowerId(), grpcServerMetrics); + pendingRequests.put(request); + increaseNextIndex(pending.get()); + if (appendLogRequestObserver == null) { + appendLogRequestObserver = new StreamObservers( + getClient(), new AppendLogResponseHandler(), useSeparateHBChannel, getWaitTimeMin()); + } + } catch (Exception e) { pending.release(); + throw e; } - throw e; } try { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java index c51464f9e9..a88ade5872 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/LogSegment.java @@ -29,6 +29,7 @@ import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader; import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream; import org.apache.ratis.util.FileUtils; +import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.ReferenceCountedObject; import org.apache.ratis.util.SizeInBytes; @@ -38,10 +39,10 @@ import java.io.File; import java.io.IOException; import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Objects; -import java.util.Optional; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; @@ -283,47 +284,103 @@ public ReferenceCountedObject load(LogRecord key) throws IOExcept final TermIndex ti = TermIndex.valueOf(entry); putEntryCache(ti, entryRef, Op.LOAD_SEGMENT_FILE); if (ti.equals(key.getTermIndex())) { - entryRef.retain(); toReturn.set(entryRef); + } else { + entryRef.release(); } - entryRef.release(); }); loadingTimes.incrementAndGet(); return Objects.requireNonNull(toReturn.get()); } } - static class EntryCache { - private final Map> map = new ConcurrentHashMap<>(); + private static class Item { + private final AtomicReference> ref; + private final long serializedSize; + + Item(ReferenceCountedObject obj, long serializedSize) { + this.ref = new AtomicReference<>(obj); + this.serializedSize = serializedSize; + } + + ReferenceCountedObject get() { + return ref.get(); + } + + long release() { + final ReferenceCountedObject entry = ref.getAndSet(null); + if (entry == null) { + return 0; + } + entry.release(); + return serializedSize; + } + } + + class EntryCache { + private Map map = new HashMap<>(); private final AtomicLong size = new AtomicLong(); + @Override + public String toString() { + return JavaUtils.getClassSimpleName(getClass()) + "-" + LogSegment.this; + } + long size() { return size.get(); } - ReferenceCountedObject get(TermIndex ti) { - return map.get(ti); + synchronized ReferenceCountedObject get(TermIndex ti) { + if (map == null) { + return null; + } + final Item ref = map.get(ti); + return ref == null? null: ref.get(); } - void clear() { - map.values().forEach(ReferenceCountedObject::release); - map.clear(); - size.set(0); + /** After close(), the cache CANNOT be used again. */ + synchronized void close() { + if (map == null) { + return; + } + evict(); + map = null; + LOG.info("Successfully closed {}", this); } - void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { + /** After evict(), the cache can be used again. */ + synchronized void evict() { + if (map == null) { + return; + } + for (Iterator> i = map.entrySet().iterator(); i.hasNext(); i.remove()) { + release(i.next().getValue()); + } + } + + synchronized void put(TermIndex key, ReferenceCountedObject valueRef, Op op) { + if (map == null) { + return; + } valueRef.retain(); - Optional.ofNullable(map.put(key, valueRef)).ifPresent(this::release); - size.getAndAdd(getEntrySize(valueRef.get(), op)); + final long serializedSize = getEntrySize(valueRef.get(), op); + release(map.put(key, new Item(valueRef, serializedSize))); + size.getAndAdd(serializedSize); } - private void release(ReferenceCountedObject entry) { - size.getAndAdd(-getEntrySize(entry.get(), Op.REMOVE_CACHE)); - entry.release(); + private void release(Item ref) { + if (ref == null) { + return; + } + final long serializedSize = ref.release(); + size.getAndAdd(-serializedSize); } - void remove(TermIndex key) { - Optional.ofNullable(map.remove(key)).ifPresent(this::release); + synchronized void remove(TermIndex key) { + if (map == null) { + return; + } + release(map.remove(key)); } } @@ -433,7 +490,13 @@ ReferenceCountedObject getEntryFromCache(TermIndex ti) { synchronized ReferenceCountedObject loadCache(LogRecord record) throws RaftLogIOException { ReferenceCountedObject entry = entryCache.get(record.getTermIndex()); if (entry != null) { - return entry; + try { + entry.retain(); + return entry; + } catch (IllegalStateException ignored) { + // The entry could be removed from the cache and released. + // The exception can be safely ignored since it is the same as cache miss. + } } try { return cacheLoader.load(record); @@ -505,7 +568,7 @@ private int compareTo(Long l) { synchronized void clear() { records.clear(); - evictCache(); + entryCache.close(); endIndex = startIndex - 1; } @@ -514,7 +577,7 @@ int getLoadingTimes() { } void evictCache() { - entryCache.clear(); + entryCache.evict(); } void putEntryCache(TermIndex key, ReferenceCountedObject valueRef, Op op) { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 44b9c7599b..485eb53d10 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -306,9 +306,14 @@ public ReferenceCountedObject retainLog(long index) throws RaftLo } final ReferenceCountedObject entry = segment.getEntryFromCache(record.getTermIndex()); if (entry != null) { - getRaftLogMetrics().onRaftLogCacheHit(); - entry.retain(); - return entry; + try { + entry.retain(); + getRaftLogMetrics().onRaftLogCacheHit(); + return entry; + } catch (IllegalStateException ignored) { + // The entry could be removed from the cache and released. + // The exception can be safely ignored since it is the same as cache miss. + } } // the entry is not in the segment's cache. Load the cache without holding the lock. @@ -346,6 +351,7 @@ public ReferenceCountedObject retainEntryWithData(long index) thr } catch (Exception e) { final String err = getName() + ": Failed readStateMachineData for " + toLogEntryString(entry); LOG.error(err, e); + entryRef.release(); throw new RaftLogIOException(err, JavaUtils.unwrapCompletionException(e)); } } @@ -558,6 +564,7 @@ public CompletableFuture onSnapshotInstalled(long lastSnapshotIndex) { @Override public void close() throws IOException { try(AutoCloseableLock writeLock = writeLock()) { + LOG.info("Start closing {}", this); super.close(); cacheEviction.close(); cache.close(); @@ -565,6 +572,7 @@ public void close() throws IOException { fileLogWorker.close(); storage.close(); getRaftLogMetrics().unregister(); + LOG.info("Successfully closed {}", this); } SegmentedRaftLogCache getRaftLogCache() { diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java index 9ed3a1b763..a3d13de9fd 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java @@ -51,6 +51,7 @@ import java.util.Optional; import java.util.Queue; import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Consumer; import java.util.function.Supplier; @@ -242,10 +243,11 @@ void start(long latestIndex, long evictIndex, File openSegmentFile) throws IOExc } void close() { + queue.close(); this.running = false; + ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_MINUTE, workerThreadExecutor, + timeout -> LOG.warn("{}: shutdown timeout in {}", name, timeout)); Optional.ofNullable(flushExecutor).ifPresent(ExecutorService::shutdown); - ConcurrentUtils.shutdownAndWait(TimeDuration.ONE_SECOND.multiply(3), - workerThreadExecutor, timeout -> LOG.warn("{}: shutdown timeout in " + timeout, name)); IOUtils.cleanup(LOG, out); PlatformDependent.freeDirectBuffer(writeBuffer); LOG.info("{} close()", name); @@ -341,7 +343,7 @@ private void run() { LOG.info(Thread.currentThread().getName() + " was interrupted, exiting. There are " + queue.getNumElements() + " tasks remaining in the queue."); - return; + break; } catch (Exception e) { if (!running) { LOG.info("{} got closed and hit exception", @@ -352,6 +354,8 @@ private void run() { } } } + + queue.clear(Task::discard); } private boolean shouldFlush() { @@ -494,7 +498,7 @@ private class WriteLog extends Task { private final LogEntryProto entry; private final CompletableFuture stateMachineFuture; private final CompletableFuture combined; - private final ReferenceCountedObject ref; + private final AtomicReference> ref = new AtomicReference<>(); WriteLog(ReferenceCountedObject entryRef, LogEntryProto removedStateMachineData, TransactionContext context) { @@ -512,7 +516,7 @@ private class WriteLog extends Task { this.stateMachineFuture = null; } entryRef.retain(); - this.ref = entryRef; + this.ref.set(entryRef); } else { try { // this.entry != origEntry if it has state machine data @@ -522,7 +526,6 @@ private class WriteLog extends Task { + ", entry=" + LogProtoUtils.toLogEntryString(origEntry, stateMachine::toStateMachineLogEntryString), e); throw e; } - this.ref = null; } this.combined = stateMachineFuture == null? super.getFuture() : super.getFuture().thenCombine(stateMachineFuture, (index, stateMachineResult) -> index); @@ -532,6 +535,7 @@ private class WriteLog extends Task { void failed(IOException e) { stateMachine.event().notifyLogFailed(e, entry); super.failed(e); + discard(); } @Override @@ -547,15 +551,14 @@ CompletableFuture getFuture() { @Override void done() { writeTasks.offerOrCompleteFuture(this); - if (ref != null) { - ref.release(); - } + discard(); } @Override void discard() { - if (ref != null) { - ref.release(); + final ReferenceCountedObject entryRef = ref.getAndSet(null); + if (entryRef != null) { + entryRef.release(); } } diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java index 18e4f2eca0..74bc0c5355 100644 --- a/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java +++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/impl/SimpleStateMachine4Testing.java @@ -83,7 +83,7 @@ public static SimpleStateMachine4Testing get(RaftServer.Division s) { return (SimpleStateMachine4Testing)s.getStateMachine(); } - private final SortedMap> indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); + private final SortedMap indexMap = Collections.synchronizedSortedMap(new TreeMap<>()); private final SortedMap dataMap = Collections.synchronizedSortedMap(new TreeMap<>()); private final Daemon checkpointer; private final SimpleStateMachineStorage storage = new SimpleStateMachineStorage(); @@ -198,9 +198,8 @@ public RoleInfoProto getLeaderElectionTimeoutInfo() { return leaderElectionTimeoutInfo; } - private void put(ReferenceCountedObject entryRef) { - LogEntryProto entry = entryRef.retain(); - final ReferenceCountedObject previous = indexMap.put(entry.getIndex(), entryRef); + private void put(LogEntryProto entry) { + final LogEntryProto previous = indexMap.put(entry.getIndex(), entry); Preconditions.assertNull(previous, "previous"); final String s = entry.getStateMachineLogEntry().getLogData().toStringUtf8(); dataMap.put(s, entry); @@ -250,7 +249,7 @@ public CompletableFuture applyTransaction(TransactionContext trx) { LogEntryProto entry = entryRef.get(); LOG.info("applyTransaction for log index {}", entry.getIndex()); - put(entryRef); + put(LogProtoUtils.copy(entry)); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); final SimpleMessage m = new SimpleMessage(entry.getIndex() + " OK"); @@ -270,8 +269,7 @@ public long takeSnapshot() { LOG.debug("Taking a snapshot with {}, file:{}", termIndex, snapshotFile); try (SegmentedRaftLogOutputStream out = new SegmentedRaftLogOutputStream(snapshotFile, false, segmentMaxSize, preallocatedSize, ByteBuffer.allocateDirect(bufferSize))) { - for (final ReferenceCountedObject entryRef : indexMap.values()) { - LogEntryProto entry = entryRef.get(); + for (final LogEntryProto entry : indexMap.values()) { if (entry.getIndex() > endIndex) { break; } else { @@ -306,7 +304,7 @@ private synchronized void loadSnapshot(SingleFileSnapshotInfo snapshot) throws I snapshot.getFile().getPath().toFile(), 0, endIndex, false)) { LogEntryProto entry; while ((entry = in.nextEntry()) != null) { - put(ReferenceCountedObject.wrap(entry)); + put(entry); updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex()); } } @@ -335,7 +333,7 @@ public CompletableFuture query(Message request) { LOG.info("query {}, all available: {}", string, dataMap.keySet()); final LogEntryProto entry = dataMap.get(string); if (entry != null) { - return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString())); + return CompletableFuture.completedFuture(Message.valueOf(entry)); } exception = new IndexOutOfBoundsException(getId() + ": LogEntry not found for query " + string); } catch (Exception e) { @@ -381,11 +379,12 @@ public void close() { running = false; checkpointer.interrupt(); }); - indexMap.values().forEach(ReferenceCountedObject::release); + indexMap.clear(); + dataMap.clear(); } public LogEntryProto[] getContent() { - return indexMap.values().stream().map(ReferenceCountedObject::get).toArray(LogEntryProto[]::new); + return indexMap.values().toArray(new LogEntryProto[0]); } public void blockStartTransaction() {