Skip to content

Commit

Permalink
Fix ParseRatisLog.
Browse files Browse the repository at this point in the history
  • Loading branch information
szetszwo committed Aug 28, 2023
1 parent eacc2e9 commit 45bdfa7
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,16 @@ public static RaftLogConf get(RaftProperties properties) {
}

private final long segmentMaxSize;

private final int maxCachedSegments;
private final long maxSegmentCacheSize;
private final SizeInBytes maxOpSize;
private final boolean unsafeFlush;
private final SizeInBytes maxOpSize;

private RaftLogConf(RaftProperties properties) {
this.segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
this.maxCachedSegments = RaftServerConfigKeys.Log.segmentCacheNumMax(properties);
this.maxSegmentCacheSize = RaftServerConfigKeys.Log.segmentCacheSizeMax(properties).getSize();
this.unsafeFlush = RaftServerConfigKeys.Log.unsafeFlushEnabled(properties);

this.maxOpSize = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
}

Expand All @@ -68,11 +66,11 @@ public long getMaxSegmentCacheSize() {
return maxSegmentCacheSize;
}

public SizeInBytes getMaxOpSize() {
return maxOpSize;
}

public boolean isUnsafeFlush() {
return unsafeFlush;
}

public SizeInBytes getMaxOpSize() {
return maxOpSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,7 @@ static LogSegment loadSegment(RaftStorage storage, File file, LogSegmentStartEnd
final LogSegment segment = newLogSegment(storage, startEnd, conf, raftLogMetrics);
final CorruptionPolicy corruptionPolicy = CorruptionPolicy.get(storage, RaftStorage::getLogCorruptionPolicy);
final boolean isOpen = startEnd.isOpen();
final int entryCount = readSegmentFile(file, startEnd, conf, corruptionPolicy, raftLogMetrics,
entry -> {
final int entryCount = readSegmentFile(file, startEnd, conf, corruptionPolicy, raftLogMetrics, entry -> {
segment.append(keepEntryInCache || isOpen, entry, Op.LOAD_SEGMENT_FILE);
if (logConsumer != null) {
logConsumer.accept(entry);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
*/
package org.apache.ratis.server.raftlog.segmented;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogEntryHeader;
Expand All @@ -33,7 +31,6 @@
import org.apache.ratis.util.AutoCloseableReadWriteLock;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@

package org.apache.ratis.tools;

import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLogConf;
import org.apache.ratis.server.raftlog.segmented.LogSegmentPath;
import org.apache.ratis.server.raftlog.segmented.LogSegment;
import org.apache.ratis.util.SizeInBytes;
Expand All @@ -34,17 +36,17 @@ public final class ParseRatisLog {

private final File file;
private final Function<StateMachineLogEntryProto, String> smLogToString;
private final SizeInBytes maxOpSize;
private final RaftLogConf conf;

private long numConfEntries;
private long numMetadataEntries;
private long numStateMachineEntries;
private long numInvalidEntries;

private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString, SizeInBytes maxOpSize) {
private ParseRatisLog(File f , Function<StateMachineLogEntryProto, String> smLogToString, RaftLogConf conf) {
this.file = f;
this.smLogToString = smLogToString;
this.maxOpSize = maxOpSize;
this.conf = conf;
this.numConfEntries = 0;
this.numMetadataEntries = 0;
this.numStateMachineEntries = 0;
Expand All @@ -59,7 +61,7 @@ public void dumpSegmentFile() throws IOException {
}

System.out.println("Processing Raft Log file: " + file.getAbsolutePath() + " size:" + file.length());
final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), maxOpSize,
final int entryCount = LogSegment.readSegmentFile(file, pi.getStartEnd(), conf,
RaftServerConfigKeys.Log.CorruptionPolicy.EXCEPTION, null, this::processLogEntry);
System.out.println("Num Total Entries: " + entryCount);
System.out.println("Num Conf Entries: " + numConfEntries);
Expand Down Expand Up @@ -106,7 +108,9 @@ public Builder setSMLogToString(Function<StateMachineLogEntryProto, String> smLo
}

public ParseRatisLog build() {
return new ParseRatisLog(file, smLogToString, maxOpSize);
final RaftProperties properties = new RaftProperties();
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, maxOpSize);
return new ParseRatisLog(file, smLogToString, RaftLogConf.get(properties));
}
}
}

0 comments on commit 45bdfa7

Please sign in to comment.