Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RATIS-1893. In SegmentedRaftLogCache, start a daemon thread to checkAndEvictCache. #924

Merged
merged 4 commits into from
Sep 24, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ public boolean await(long time, TimeUnit unit) throws InterruptedException {
}
lock.lock();
try {
if (signaled.get().get()) {
return true;
}
return condition.await(time, unit);
} finally {
lock.unlock();
Expand Down
113 changes: 113 additions & 0 deletions ratis-common/src/main/java/org/apache/ratis/util/AwaitToRun.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.util;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.atomic.AtomicReference;

/**
* Use a {@link Daemon},
* which repeatedly waits for a signal to run a method.
* <p>
* This class is threadsafe.
*
* @see AwaitForSignal
*/
public class AwaitToRun implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(AwaitToRun.class);

private final class RunnableImpl implements Runnable {
private final Runnable runMethod;

private RunnableImpl(Runnable runMethod) {
this.runMethod = runMethod;
}

@Override
public void run() {
for (; ; ) {
try {
awaitForSignal.await();
} catch (InterruptedException e) {
LOG.info("{} is interrupted", awaitForSignal);
Thread.currentThread().interrupt();
return;
}

try {
runMethod.run();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the thread will never sleep after the first signal, even if there is no write, it will always be busy executing the run function. I'm not sure if I've got it wrong

Copy link
Contributor

@ivandika3 ivandika3 Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I understand correctly, once the runMethod (e.g. checkAndEvictCache) finishes, it will loop back to awaitForSignal.await which will block until signal() is sent.

Also, I think that this ensures checkAndEvictCache run sequentially. If cacheEviction.signal() is sent multiple times for each append entry, checkAndEvictCache only runs it if runMethod finished running. So this might avoid the call for checkAndEvictCache for every append entry.

Please correct me if I'm wrong.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it's my fault! I was wondering why getAndSet was used here, now I understand, very clever design!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this might avoid the call for checkAndEvictCache for every append entry.

Yes, it definitely reduces the number of calls ~ so that leaves the concurrency control part above to see how we can do it ~

} catch (Throwable t) {
LOG.error(name + ": runMethod failed", t);
}
}
}
}

private final String name;
private final AwaitForSignal awaitForSignal;
private final AtomicReference<Daemon> daemon;

public AwaitToRun(Object namePrefix, Runnable runMethod) {
this.name = namePrefix + "-" + JavaUtils.getClassSimpleName(getClass());
this.awaitForSignal = new AwaitForSignal(name);
this.daemon = new AtomicReference<>(Daemon.newBuilder()
.setName(name)
.setRunnable(new RunnableImpl(runMethod))
.build());
}

/** Similar to {@link Thread#start()}. */
public AwaitToRun start() {
final Daemon d = daemon.get();
if (d != null) {
d.start();
LOG.info("{} started", d);
} else {
LOG.warn("{} is already closed", name);
}
return this;
}

/** Signal to run. */
public void signal() {
awaitForSignal.signal();
}

@Override
public void close() {
final Daemon d = daemon.getAndSet(null);
if (d == null) {
return;
}

d.interrupt();
try {
d.join();
} catch (InterruptedException e) {
LOG.warn(d + ": join is interrupted", e);
Thread.currentThread().interrupt();
}
}

@Override
public String toString() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.AutoCloseableLock;
import org.apache.ratis.util.AwaitToRun;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.StringUtils;
Expand Down Expand Up @@ -183,6 +184,7 @@ public void notifyTruncatedLogEntry(TermIndex ti) {
private final RaftStorage storage;
private final StateMachine stateMachine;
private final SegmentedRaftLogCache cache;
private final AwaitToRun cacheEviction;
private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
Expand All @@ -200,6 +202,7 @@ public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server,
this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics());
this.cacheEviction = new AwaitToRun(memberId + "-cacheEviction", this::checkAndEvictCache).start();
this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics());
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
Expand Down Expand Up @@ -278,7 +281,7 @@ record = segment.getLogRecord(index);

// the entry is not in the segment's cache. Load the cache without holding the lock.
getRaftLogMetrics().onRaftLogCacheMiss();
checkAndEvictCache();
cacheEviction.signal();
return segment.loadCache(record);
}

Expand Down Expand Up @@ -382,26 +385,29 @@ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
final Timekeeper.Context appendEntryTimerContext = getRaftLogMetrics().startAppendEntryTimer();
validateLogEntry(entry);
final LogSegment currentOpenSegment = cache.getOpenSegment();
boolean rollOpenSegment = false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to check here when addOpenSegment?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the contents of shouldEvict, it seems that a check is not necessary because nothing will change.

    final CacheInfo closedSegmentsCacheInfo = closedSegments.getCacheInfo();
    if (closedSegmentsCacheInfo.getCount() > maxCachedSegments) {
      return true;
    }

    final long size = closedSegmentsCacheInfo.getSize()
        + Optional.ofNullable(openSegment).map(LogSegment::getTotalCacheSize).orElse(0L);
    return size > maxSegmentCacheSize;
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For rollOpenSegment, we will close the open segement so it increases the count. For addOpenSegment, nothing will change as you mentioned.

if (currentOpenSegment == null) {
cache.addOpenSegment(entry.getIndex());
fileLogWorker.startLogSegment(entry.getIndex());
} else if (isSegmentFull(currentOpenSegment, entry)) {
rollOpenSegment = true;
} else {
final TermIndex last = currentOpenSegment.getLastTermIndex();
if (last != null && last.getTerm() != entry.getTerm()) {
// the term changes
Preconditions.assertTrue(last.getTerm() < entry.getTerm(),
"open segment's term %s is larger than the new entry's term %s",
last.getTerm(), entry.getTerm());
rollOpenSegment = true;
}
}

if (rollOpenSegment) {
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
} else if (currentOpenSegment.numOfEntries() > 0 &&
currentOpenSegment.getLastTermIndex().getTerm() != entry.getTerm()) {
// the term changes
final long currentTerm = currentOpenSegment.getLastTermIndex().getTerm();
Preconditions.assertTrue(currentTerm < entry.getTerm(),
"open segment's term %s is larger than the new entry's term %s",
currentTerm, entry.getTerm());
cache.rollOpenSegment(true);
fileLogWorker.rollLogSegment(currentOpenSegment);
cacheEviction.signal();
}

//TODO(runzhiwang): If there is performance problem, start a daemon thread to checkAndEvictCache
checkAndEvictCache();

// If the entry has state machine data, then the entry should be inserted
// to statemachine first and then to the cache. Not following the order
// will leave a spurious entry in the cache.
Expand Down Expand Up @@ -505,6 +511,7 @@ public CompletableFuture<Long> onSnapshotInstalled(long lastSnapshotIndex) {
public void close() throws IOException {
try(AutoCloseableLock writeLock = writeLock()) {
super.close();
cacheEviction.close();
cache.close();
Copy link
Contributor

@OneSizeFitsQuorum OneSizeFitsQuorum Sep 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the cache is now being called by multiple threads concurrently. Should we add more concurrency control logic inside the cache?Otherwise, concurrent traversal and modification of arrays such as closedSegments could fail

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache is thread safe and it is already called by multiple threads; see RATIS-430.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

got it

}
fileLogWorker.close();
Expand Down
Loading