diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java index 88808c52b3..5ae0bbc7fa 100644 --- a/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java +++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitForSignal.java @@ -59,6 +59,9 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException { } lock.lock(); try { + if (signaled.get().get()) { + return true; + } return condition.await(time, unit); } finally { lock.unlock(); diff --git a/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java b/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java new file mode 100644 index 0000000000..d93bb9a985 --- /dev/null +++ b/ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ratis.util; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicReference; + +/** + * Use a {@link Daemon}, + * which repeatedly waits for a signal to run a method. + *

+ * This class is threadsafe. + * + * @see AwaitForSignal + */ +public class AwaitToRun implements AutoCloseable { + public static final Logger LOG = LoggerFactory.getLogger(AwaitToRun.class); + + private class RunnableImpl implements Runnable { + private final Runnable runMethod; + + private RunnableImpl(Runnable runMethod) { + this.runMethod = runMethod; + } + + @Override + public void run() { + for (; ; ) { + try { + awaitForSignal.await(); + } catch (InterruptedException e) { + LOG.info("{} is interrupted", awaitForSignal); + Thread.currentThread().interrupt(); + return; + } + + try { + runMethod.run(); + } catch (Throwable t) { + LOG.error(name + ": runMethod failed", t); + } + } + } + } + + private final String name; + private final AwaitForSignal awaitForSignal; + private final AtomicReference daemon; + + public AwaitToRun(Object namePrefix, Runnable runMethod) { + this.name = namePrefix + "-" + JavaUtils.getClassSimpleName(getClass()); + this.awaitForSignal = new AwaitForSignal(name); + this.daemon = new AtomicReference<>(Daemon.newBuilder() + .setName(name) + .setRunnable(new RunnableImpl(runMethod)) + .build()); + } + + /** Similar to {@link Thread#start()}. */ + public AwaitToRun start() { + final Daemon d = daemon.get(); + if (d != null) { + d.start(); + LOG.info("{} started", d); + } else { + LOG.warn("{} is already closed", name); + } + return this; + } + + /** Signal to run. */ + public void signal() { + awaitForSignal.signal(); + } + + @Override + public void close() { + final Daemon d = daemon.getAndSet(null); + if (d == null) { + return; + } + + d.interrupt(); + try { + d.join(); + } catch (InterruptedException e) { + LOG.warn(d + ": join is interrupted", e); + Thread.currentThread().interrupt(); + } + } + + @Override + public String toString() { + return name; + } +} diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java index 985652437e..109dc81e0c 100644 --- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java +++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java @@ -36,6 +36,7 @@ import org.apache.ratis.statemachine.StateMachine; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.AutoCloseableLock; +import org.apache.ratis.util.AwaitToRun; import org.apache.ratis.util.JavaUtils; import org.apache.ratis.util.Preconditions; import org.apache.ratis.util.StringUtils; @@ -183,6 +184,7 @@ public void notifyTruncatedLogEntry(TermIndex ti) { private final RaftStorage storage; private final StateMachine stateMachine; private final SegmentedRaftLogCache cache; + private final AwaitToRun cacheEviction; private final SegmentedRaftLogWorker fileLogWorker; private final long segmentMaxSize; private final boolean stateMachineCachingEnabled; @@ -200,6 +202,7 @@ public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server, this.stateMachine = stateMachine; segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize(); this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics()); + this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction", this::checkAndEvictCache).start(); this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine, submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics()); stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties); @@ -278,7 +281,7 @@ record = segment.getLogRecord(index); // the entry is not in the segment's cache. Load the cache without holding the lock. getRaftLogMetrics().onRaftLogCacheMiss(); - checkAndEvictCache(); + cacheEviction.signal(); return segment.loadCache(record); } @@ -399,8 +402,7 @@ protected CompletableFuture appendEntryImpl(LogEntryProto entry) { fileLogWorker.rollLogSegment(currentOpenSegment); } - //TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache - checkAndEvictCache(); + cacheEviction.signal(); // If the entry has state machine data, then the entry should be inserted // to statemachine first and then to the cache. Not following the order @@ -505,6 +507,7 @@ public CompletableFuture onSnapshotInstalled(long lastSnapshotIndex) { public void close() throws IOException { try(AutoCloseableLock writeLock = writeLock()) { super.close(); + cacheEviction.close(); cache.close(); } fileLogWorker.close();