Skip to content

Commit

Permalink
RATIS-1879. Handle RaftLog corruption when unsafe flush is enabled.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Aug 28, 2023
1 parent b06f82a commit eacc2e9
Show file tree
Hide file tree
Showing 10 changed files with 151 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.ratis.server.raftlog;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.SizeInBytes;

import java.util.function.Supplier;

/**
* {@link RaftLog} related {@link RaftServerConfigKeys}.
*/
public final class RaftLogConf {
private static final Supplier<RaftLogConf> DEFAULT = MemoizedSupplier.valueOf(() -> get(new RaftProperties()));

/** @return the default conf. */
public static RaftLogConf get() {
return DEFAULT.get();
}

/** @return the conf from the given properties. */
public static RaftLogConf get(RaftProperties properties) {
return new RaftLogConf(properties);
}

private final long segmentMaxSize;

private final int maxCachedSegments;
private final long maxSegmentCacheSize;
private final SizeInBytes maxOpSize;
private final boolean unsafeFlush;

private RaftLogConf(RaftProperties properties) {
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);

this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
}

public long getSegmentMaxSize() {
return segmentMaxSize;
}

public int getMaxCachedSegments() {
return maxCachedSegments;
}

public long getMaxSegmentCacheSize() {
return maxSegmentCacheSize;
}

public SizeInBytes getMaxOpSize() {
return maxOpSize;
}

public boolean isUnsafeFlush() {
return unsafeFlush;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogConf;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.ratis.util.FileUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -103,31 +103,31 @@ long getOffset() {
}
}

static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
static LogSegment newOpenSegment(RaftStorage storage, long start, RaftLogConf conf,
SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0);
return new LogSegment(storage, true, start, start - 1, maxOpSize, raftLogMetrics);
return new LogSegment(storage, true, start, start - 1, conf, raftLogMetrics);
}

@VisibleForTesting
static LogSegment newCloseSegment(RaftStorage storage,
long start, long end, SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
long start, long end, RaftLogConf conf, SegmentedRaftLogMetrics raftLogMetrics) {
Preconditions.assertTrue(start >= 0 && end >= start);
return new LogSegment(storage, false, start, end, maxOpSize, raftLogMetrics);
return new LogSegment(storage, false, start, end, conf, raftLogMetrics);
}

static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
static LogSegment newLogSegment(RaftStorage storage, LogSegmentStartEnd startEnd, RaftLogConf conf,
SegmentedRaftLogMetrics metrics) {
return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), maxOpSize, metrics)
: newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), maxOpSize, metrics);
return startEnd.isOpen()? newOpenSegment(storage, startEnd.getStartIndex(), conf, metrics)
: newCloseSegment(storage, startEnd.getStartIndex(), startEnd.getEndIndex(), conf, metrics);
}

public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, RaftLogConf conf,
CorruptionPolicy corruptionPolicy, SegmentedRaftLogMetrics raftLogMetrics, Consumer<LogEntryProto> entryConsumer)
throws IOException {
int count = 0;
try (SegmentedRaftLogInputStream in = new SegmentedRaftLogInputStream(
file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), maxOpSize, raftLogMetrics)) {
file, startEnd.getStartIndex(), startEnd.getEndIndex(), startEnd.isOpen(), conf, raftLogMetrics)) {
for(LogEntryProto prev = null, next; (next = in.nextEntry()) != null; prev = next) {
if (prev != null) {
Preconditions.assertTrue(next.getIndex() == prev.getIndex() + 1,
Expand Down Expand Up @@ -155,13 +155,14 @@ public static int readSegmentFile(File file, LogSegmentStartEnd startEnd, SizeIn
return count;
}

static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, SizeInBytes maxOpSize,
static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd startEnd, RaftLogConf conf,
boolean keepEntryInCache, Consumer<LogEntryProto> logConsumer, SegmentedRaftLogMetrics raftLogMetrics)
throws IOException {
final LogSegment segment = newLogSegment(storage, startEnd, maxOpSize, raftLogMetrics);
final LogSegment segment = newLogSegment(storage, startEnd, conf, raftLogMetrics);
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
final boolean isOpen = startEnd.isOpen();
final int entryCount = readSegmentFile(file, startEnd, maxOpSize, corruptionPolicy, raftLogMetrics, entry -> {
final int entryCount = readSegmentFile(file, startEnd, conf, corruptionPolicy, raftLogMetrics,
entry -> {
segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
if (logConsumer != null) {
logConsumer.accept(entry);
Expand Down Expand Up @@ -233,7 +234,7 @@ public LogEntryProto load(LogRecord key) throws IOException {
// the on-disk log file should be truncated but has not been done yet.
final AtomicReference<LogEntryProto> toReturn = new AtomicReference<>();
final LogSegmentStartEnd startEnd = LogSegmentStartEnd.valueOf(startIndex, endIndex, isOpen);
readSegmentFile(file, startEnd, maxOpSize,
readSegmentFile(file, startEnd, conf,
getLogCorruptionPolicy(), raftLogMetrics, entry -> {
final TermIndex ti = TermIndex.valueOf(entry);
putEntryCache(ti, entry, Op.LOAD_SEGMENT_FILE);
Expand All @@ -258,7 +259,7 @@ File getFile() {
/** Segment end index, inclusive. */
private volatile long endIndex;
private RaftStorage storage;
private final SizeInBytes maxOpSize;
private final RaftLogConf conf;
private final LogEntryLoader cacheLoader;
/** later replace it with a metric */
private final AtomicInteger loadingTimes = new AtomicInteger();
Expand All @@ -272,13 +273,13 @@ File getFile() {
*/
private final Map<TermIndex, LogEntryProto> entryCache = new ConcurrentHashMap<>();

private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, SizeInBytes maxOpSize,
private LogSegment(RaftStorage storage, boolean isOpen, long start, long end, RaftLogConf conf,
SegmentedRaftLogMetrics raftLogMetrics) {
this.storage = storage;
this.isOpen = isOpen;
this.startIndex = start;
this.endIndex = end;
this.maxOpSize = maxOpSize;
this.conf = conf;
this.cacheLoader = new LogEntryLoader(raftLogMetrics);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogBase;
import org.apache.ratis.server.raftlog.RaftLogConf;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.server.storage.RaftStorageMetadata;
import org.apache.ratis.server.storage.RaftStorage;
Expand Down Expand Up @@ -182,9 +183,9 @@ public void notifyTruncatedLogEntry(TermIndex ti) {
private final ServerLogMethods server;
private final RaftStorage storage;
private final StateMachine stateMachine;
private final RaftLogConf conf;
private final SegmentedRaftLogCache cache;
private final SegmentedRaftLogWorker fileLogWorker;
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
private final SegmentedRaftLogMetrics metrics;

Expand All @@ -198,8 +199,8 @@ public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server,
this.server = newServerLogMethods(server, notifyTruncatedLogEntry);
this.storage = storage;
this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.cache = new SegmentedRaftLogCache(memberId, storage, properties, getRaftLogMetrics());
this.conf = RaftLogConf.get(properties);
this.cache = new SegmentedRaftLogCache(memberId, storage, conf, getRaftLogMetrics());
this.fileLogWorker = new SegmentedRaftLogWorker(memberId, stateMachine,
submitUpdateCommitEvent, server, storage, properties, getRaftLogMetrics());
stateMachineCachingEnabled = RaftServerConfigKeys.Log.StateMachineData.cachingEnabled(properties);
Expand Down Expand Up @@ -423,6 +424,7 @@ protected CompletableFuture<Long> appendEntryImpl(LogEntryProto entry) {
}

private boolean isSegmentFull(LogSegment segment, LogEntryProto entry) {
final long segmentMaxSize = conf.getSegmentMaxSize();
if (segment.getTotalFileSize() >= segmentMaxSize) {
return true;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.ratis.server.raftlog.LogEntryHeader;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogConf;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
import org.apache.ratis.server.raftlog.segmented.LogSegment.LogRecord;
Expand Down Expand Up @@ -351,35 +352,32 @@ public String toString() {
private volatile LogSegment openSegment;
private final LogSegmentList closedSegments;
private final RaftStorage storage;
private final SizeInBytes maxOpSize;
private final RaftLogConf conf;
private final SegmentedRaftLogMetrics raftLogMetrics;

private final int maxCachedSegments;
private final CacheInvalidationPolicy evictionPolicy = new CacheInvalidationPolicyDefault();
private final long maxSegmentCacheSize;

SegmentedRaftLogCache(Object name, RaftStorage storage, RaftProperties properties,
SegmentedRaftLogCache(Object name, RaftStorage storage, RaftLogConf conf,
SegmentedRaftLogMetrics raftLogMetrics) {
this.name = name + "-" + JavaUtils.getClassSimpleName(getClass());
this.closedSegments = new LogSegmentList(name);
this.storage = storage;
this.conf = conf;

this.raftLogMetrics = raftLogMetrics;
this.raftLogMetrics.addClosedSegmentsNum(this::getCachedSegmentNum);
this.raftLogMetrics.addClosedSegmentsSizeInBytes(this::getClosedSegmentsSizeInBytes);
this.raftLogMetrics.addOpenSegmentSizeInBytes(this::getOpenSegmentSizeInBytes);
this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
}

int getMaxCachedSegments() {
return maxCachedSegments;
return conf.getMaxCachedSegments();
}

void loadSegment(LogSegmentPath pi, boolean keepEntryInCache,
Consumer<LogEntryProto> logConsumer) throws IOException {
final LogSegment logSegment = LogSegment.loadSegment(storage, pi.getPath().toFile(), pi.getStartEnd(),
maxOpSize, keepEntryInCache, logConsumer, raftLogMetrics);
conf, keepEntryInCache, logConsumer, raftLogMetrics);
if (logSegment != null) {
addSegment(logSegment);
}
Expand All @@ -403,12 +401,13 @@ private long getTotalCacheSize() {
}

boolean shouldEvict() {
return closedSegments.countCached() > maxCachedSegments || getTotalCacheSize() > maxSegmentCacheSize;
return closedSegments.countCached() > getMaxCachedSegments()
|| getTotalCacheSize() > conf.getMaxSegmentCacheSize();
}

void evictCache(long[] followerIndices, long safeEvictIndex, long lastAppliedIndex) {
List<LogSegment> toEvict = evictionPolicy.evict(followerIndices,
safeEvictIndex, lastAppliedIndex, closedSegments, maxCachedSegments);
safeEvictIndex, lastAppliedIndex, closedSegments, getMaxCachedSegments());
for (LogSegment s : toEvict) {
s.evictCache();
}
Expand Down Expand Up @@ -437,7 +436,7 @@ void addSegment(LogSegment segment) {
}

void addOpenSegment(long startIndex) {
setOpenSegment(LogSegment.newOpenSegment(storage, startIndex, maxOpSize, raftLogMetrics));
setOpenSegment(LogSegment.newOpenSegment(storage, startIndex, conf, raftLogMetrics));
}

private void setOpenSegment(LogSegment openSegment) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@

import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.raftlog.RaftLogConf;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.OpenCloseState;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -67,12 +67,12 @@ boolean hasCorruptHeader() {
private final boolean isOpen;
private final OpenCloseState state;
private SegmentedRaftLogReader reader;
private final SizeInBytes maxOpSize;
private final RaftLogConf conf;
private final SegmentedRaftLogMetrics raftLogMetrics;

SegmentedRaftLogInputStream(File log, long startIndex, long endIndex, boolean isOpen,
SizeInBytes maxOpSize, SegmentedRaftLogMetrics raftLogMetrics) {
this.maxOpSize = maxOpSize;
RaftLogConf conf, SegmentedRaftLogMetrics raftLogMetrics) {
this.conf = conf;
if (isOpen) {
Preconditions.assertTrue(endIndex == INVALID_LOG_INDEX);
} else {
Expand All @@ -91,7 +91,7 @@ private void init() throws IOException {
state.open();
boolean initSuccess = false;
try {
reader = new SegmentedRaftLogReader(logFile, maxOpSize, raftLogMetrics);
reader = new SegmentedRaftLogReader(logFile, conf, raftLogMetrics);
initSuccess = reader.verifyHeader();
} finally {
if (!initSuccess) {
Expand Down Expand Up @@ -190,11 +190,11 @@ public String toString() {
* @return Result of the validation
* @throws IOException
*/
static LogValidation scanEditLog(File file, long maxTxIdToScan, SizeInBytes maxOpSize)
static LogValidation scanEditLog(File file, long maxTxIdToScan, RaftLogConf conf)
throws IOException {
SegmentedRaftLogInputStream in;
try {
in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, maxOpSize, null);
in = new SegmentedRaftLogInputStream(file, INVALID_LOG_INDEX, INVALID_LOG_INDEX, false, conf, null);
// read the header, initialize the inputstream
in.init();
} catch (EOFException e) {
Expand Down
Loading

0 comments on commit eacc2e9

Please sign in to comment.