From 106480b9b531cf2857542675264d0407db15363a Mon Sep 17 00:00:00 2001 From: Tsz-Wo Nicholas Sze Date: Sat, 23 Sep 2023 19:23:16 -0700 Subject: [PATCH] RATIS-1893. In SegmentedRaftLogCache, start a daemon thread to checkAndEvictCache. (#924) --- .../org/apache/ratis/util/AwaitForSignal.java | 3 + .../org/apache/ratis/util/AwaitToRun.java | 113 ++++++++++++++++++ .../raftlog/segmented/SegmentedRaftLog.java | 34 ++++-- 3 files changed, 137 insertions(+), 13 deletions(-) create mode 100644 ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java index 88808c52b3..5ae0bbc7fa 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java @@ -59,6 +59,9 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException { } lock.lock(); try { + if (signaled.get().get()) { + return true; + } return condition.await(time, unit); } finally { lock.unlock(); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java b/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java new file mode 100644 index 0000000000..7eedef915c --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Use a {@link Daemon}, + * which repeatedly waits for a signal to run a method. + *

+ * This class is threadsafe. + * + * @see AwaitForSignal + */ +public class AwaitToRun implements AutoCloseable { + public static final Logger LOG = LoggerFactory.getLogger(AwaitToRun.class); + + private final class RunnableImpl implements Runnable { + private final Runnable runMethod; + + private RunnableImpl(Runnable runMethod) { + this.runMethod = runMethod; + } + + @Override + public void run() { + for (; ; ) { + try { + awaitForSignal.await(); + } catch (InterruptedException e) { + LOG.info("{} is interrupted", awaitForSignal); + Thread.currentThread().interrupt(); + return; + } + + try { + runMethod.run(); + } catch (Throwable t) { + LOG.error(name + ": runMethod failed", t); + } + } + } + } + + private final String name; + private final AwaitForSignal awaitForSignal; + private final AtomicReference daemon; + + public AwaitToRun(Object namePrefix, Runnable runMethod) { + this.name = namePrefix + "-" + JavaUtils.getClassSimpleName(getClass()); + this.awaitForSignal = new AwaitForSignal(name); + this.daemon = new AtomicReference<>(Daemon.newBuilder() + .setName(name) + .setRunnable(new RunnableImpl(runMethod)) + .build()); + } + + /** Similar to {@link Thread#start()}. */ + public AwaitToRun start() { + final Daemon d = daemon.get(); + if (d != null) { + d.start(); + LOG.info("{} started", d); + } else { + LOG.warn("{} is already closed", name); + } + return this; + } + + /** Signal to run. */ + public void signal() { + awaitForSignal.signal(); + } + + @Override + public void close() { + final Daemon d = daemon.getAndSet(null); + if (d == null) { + return; + } + + d.interrupt(); + try { + d.join(); + } catch (InterruptedException e) { + LOG.warn(d + ": join is interrupted", e); + Thread.currentThread().interrupt(); + } + } + + @Override + public String toString() { + return name; + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 985652437e..255bec2911 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -36,6 +36,7 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.AwaitToRun; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.StringUtils; @@ -183,6 +184,7 @@ public void notifyTruncatedLogEntry(TermIndex ti) { private final RaftStorage storage; private final StateMachine stateMachine; private final SegmentedRaftLogCache cache; + private final AwaitToRun cacheEviction; private final SegmentedRaftLogWorker fileLogWorker; private final long segmentMaxSize; private final boolean stateMachineCachingEnabled; @@ -200,6 +202,7 @@ public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server, this.stateMachine = stateMachine; segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics()); + this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction", this::checkAndEvictCache).start(); this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine, submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics()); stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties); @@ -278,7 +281,7 @@ record = segment.getLogRecord(index); // the entry is not in the segment's cache. Load the cache without holding the lock. getRaftLogMetrics().onRaftLogCacheMiss(); - checkAndEvictCache(); + cacheEviction.signal(); return segment.loadCache(record); } @@ -382,26 +385,29 @@ protected CompletableFuture appendEntryImpl(LogEntryProto entry) { final Timekeeper.Context appendEntryTimerContext = getRaftLogMetrics().startAppendEntryTimer(); validateLogEntry(entry); final LogSegment currentOpenSegment = cache.getOpenSegment(); + boolean rollOpenSegment = false; if (currentOpenSegment == null) { cache.addOpenSegment(entry.getIndex()); fileLogWorker.startLogSegment(entry.getIndex()); } else if (isSegmentFull(currentOpenSegment, entry)) { + rollOpenSegment = true; + } else { + final TermIndex last = currentOpenSegment.getLastTermIndex(); + if (last != null && last.getTerm() != entry.getTerm()) { + // the term changes + Preconditions.assertTrue(last.getTerm() < entry.getTerm(), + "open segment's term %s is larger than the new entry's term %s", + last.getTerm(), entry.getTerm()); + rollOpenSegment = true; + } + } + + if (rollOpenSegment) { cache.rollOpenSegment(true); fileLogWorker.rollLogSegment(currentOpenSegment); - } else if (currentOpenSegment.numOfEntries() > 0 && - currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) { - // the term changes - final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm(); - Preconditions.assertTrue(currentTerm < entry.getTerm(), - "open segment's term %s is larger than the new entry's term %s", - currentTerm, entry.getTerm()); - cache.rollOpenSegment(true); - fileLogWorker.rollLogSegment(currentOpenSegment); + cacheEviction.signal(); } - //TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache - checkAndEvictCache(); - // If the entry has state machine data, then the entry should be inserted // to statemachine first and then to the cache. Not following the order // will leave a spurious entry in the cache. @@ -496,6 +502,7 @@ public CompletableFuture onSnapshotInstalled(long lastSnapshotIndex) { if (openSegment.getEndIndex() <= lastSnapshotIndex) { fileLogWorker.closeLogSegment(openSegment); cache.rollOpenSegment(false); + cacheEviction.signal(); } } return purgeImpl(lastSnapshotIndex); @@ -505,6 +512,7 @@ public CompletableFuture onSnapshotInstalled(long lastSnapshotIndex) { public void close() throws IOException { try(AutoCloseableLock writeLock = writeLock()) { super.close(); + cacheEviction.close(); cache.close(); } fileLogWorker.close();