Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 147 additions & 3 deletions fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.fluss.config.ConfigOptions;
import org.apache.fluss.config.Configuration;
import org.apache.fluss.exception.InvalidOffsetException;
import org.apache.fluss.exception.LogSegmentOffsetOverflowException;
import org.apache.fluss.exception.LogStorageException;
import org.apache.fluss.metadata.LogFormat;
Expand All @@ -31,8 +32,12 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
Expand Down Expand Up @@ -76,7 +81,7 @@ public LogLoader(
*
* @return the offsets of the Log successfully loaded from disk
*/
public LoadedLogOffsets load() throws IOException {
public LoadedLogOffsets load() throws Exception {
// load all the log and index files.
logSegments.close();
logSegments.clear();
Expand Down Expand Up @@ -117,6 +122,37 @@ public LoadedLogOffsets load() throws IOException {
nextOffset, activeSegment.getBaseOffset(), activeSegment.getSizeInBytes()));
}

/**
* Just recovers the given segment, without adding it to the provided segments.
*
* @param segment Segment to recover
* @return The number of bytes truncated from the segment
* @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index
* offset overflow
*/
private int recoverSegment(LogSegment segment) throws Exception {
WriterStateManager writerStateManager =
new WriterStateManager(
logSegments.getTableBucket(),
logTabletDir,
this.writerStateManager.writerExpirationMs());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we need to new a WriterStateManager? Maybe we can add a clear() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic follow Kafka, and ceate a new WriterStateManager is a lightweight operation. I think it's ok.

// TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is
// that the current implementation of logStartOffset in Fluss is not yet fully refined, and
// there may be cases where logStartOffset is not updated. As a result, logStartOffset is
// not yet reliable. Once the issue with correctly updating logStartOffset is resolved in
// issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here.
// Additionally, using 0 versus using logStartOffset does not affect correctness—they both
// can restore the complete WriterState. The only difference is that using logStartOffset
// can potentially skip over more segments.
LogTablet.rebuildWriterState(
writerStateManager, logSegments, 0, segment.getBaseOffset(), false);
int bytesTruncated = segment.recover();
// once we have recovered the segment's data, take a snapshot to ensure that we won't
// need to reload the same segment again while recovering another segment.
writerStateManager.takeSnapshot();
return bytesTruncated;
}

/**
* Recover the log segments (if there was an unclean shutdown). Ensures there is at least one
* active segment, and returns the updated recovery point and next offset after recovery.
Expand All @@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException {
* overflow
*/
private Tuple2<Long, Long> recoverLog() throws IOException {
// TODO truncate log to recover maybe unflush segments.
if (!isCleanShutdown) {
List<LogSegment> unflushed =
logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE);
int numUnflushed = unflushed.size();
Iterator<LogSegment> unflushedIter = unflushed.iterator();
boolean truncated = false;
int numFlushed = 1;

while (unflushedIter.hasNext() && !truncated) {
LogSegment segment = unflushedIter.next();
LOG.info(
"Recovering unflushed segment {}. {}/{} recovered for bucket {}",
segment.getBaseOffset(),
numFlushed,
numUnflushed,
logSegments.getTableBucket());

int truncatedBytes = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to be this too aggressive here? Deleting all subsequent logSegments just because one cannot be repaired — I feel this might pose a risk of data loss. Also, we don't have test coverage for this logic.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic also follow Kafka. From my perspective, data loss is unlikely because the data is stored in multiple replicas. Once the file is truncated to the correct position, it can synchronize the latest data from the leader. If truncation is not carried out, the file appears to be unrecoverable, and if the host machine becomes the leader afterward, unforeseen problems might occur.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I add test to cover this.

try {
truncatedBytes = recoverSegment(segment);
} catch (Exception e) {
if (e instanceof InvalidOffsetException) {
long startOffset = segment.getBaseOffset();
LOG.warn(
"Found invalid offset during recovery for bucket {}. Deleting the corrupt segment "
+ "and creating an empty one with starting offset {}",
logSegments.getTableBucket(),
startOffset);
truncatedBytes = segment.truncateTo(startOffset);
}
}

if (truncatedBytes > 0) {
// we had an invalid message, delete all remaining log
LOG.warn(
"Corruption found in segment {} for bucket {}, truncating to offset {}",
segment.getBaseOffset(),
logSegments.getTableBucket(),
segment.readNextOffset());
removeAndDeleteSegments(unflushedIter);
truncated = true;
} else {
numFlushed += 1;
}
}
}

if (logSegments.isEmpty()) {
// TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready
logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat));
}
long logEndOffset = logSegments.lastSegment().get().readNextOffset();
return Tuple2.of(recoveryPointCheckpoint, logEndOffset);
}

/**
* This method deletes the given log segments and the associated writer snapshots.
*
* <p>This method does not need to convert IOException to {@link LogStorageException} because it
* is either called before all logs are loaded or the immediate caller will catch and handle
* IOException
*
* @param segmentsToDelete The log segments to schedule for deletion
*/
private void removeAndDeleteSegments(Iterator<LogSegment> segmentsToDelete) {
if (segmentsToDelete.hasNext()) {
List<LogSegment> toDelete = new ArrayList<>();
segmentsToDelete.forEachRemaining(toDelete::add);

LOG.info(
"Deleting segments for bucket {} as part of log recovery: {}",
logSegments.getTableBucket(),
toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(",")));
toDelete.forEach(segment -> logSegments.remove(segment.getBaseOffset()));

try {
LocalLog.deleteSegmentFiles(
toDelete, LocalLog.SegmentDeletionReason.LOG_TRUNCATION);
} catch (IOException e) {
LOG.error(
"Failed to delete truncated segments {} for bucket {}",
toDelete,
logSegments.getTableBucket(),
e);
}

try {
LogTablet.deleteWriterSnapshots(toDelete, writerStateManager);
} catch (IOException e) {
LOG.error(
"Failed to delete truncated writer snapshots {} for bucket {}",
toDelete,
logSegments.getTableBucket(),
e);
}
}
}

/** Loads segments from disk into the provided segments. */
private void loadSegmentFiles() throws IOException {
private void loadSegmentFiles() throws Exception {
File[] sortedFiles = logTabletDir.listFiles();
if (sortedFiles != null) {
Arrays.sort(sortedFiles, Comparator.comparing(File::getName));
Expand All @@ -155,8 +281,26 @@ private void loadSegmentFiles() throws IOException {
}
} else if (LocalLog.isLogFile(file)) {
long baseOffset = FlussPaths.offsetFromFile(file);
boolean timeIndexFileNewlyCreated =
!FlussPaths.timeIndexFile(logTabletDir, baseOffset).exists();
LogSegment segment =
LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat);

try {
segment.sanityCheck(timeIndexFileNewlyCreated);
} catch (Exception e) {
if (e instanceof NoSuchFieldException) {
if (isCleanShutdown
|| segment.getBaseOffset() < recoveryPointCheckpoint) {
LOG.error(
"Could not find offset index file corresponding to log file {} "
+ "for bucket {}, recovering segment and rebuilding index files...",
logSegments.getTableBucket(),
segment.getFileLogRecords().file().getAbsoluteFile());
}
recoverSegment(segment);
}
}
logSegments.add(segment);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,23 @@ public void resizeIndexes(int size) throws IOException {
timeIndex().resize(size);
}

public void sanityCheck(boolean timeIndexFileNewlyCreated) throws Exception {
if (lazyOffsetIndex.file().exists()) {
// Resize the time index file to 0 if it is newly created.
if (timeIndexFileNewlyCreated) {
timeIndex().resize(0);
}
// Sanity checks for time index and offset index are skipped because
// we will recover the segments above the recovery point in recoverLog()
// in any case so sanity checking them here is redundant.
} else {
throw new NoSuchFieldException(
"Offset index file "
+ lazyOffsetIndex.file().getAbsolutePath()
+ " does not exist.");
}
}

/**
* The maximum timestamp we see so far.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1271,7 +1271,7 @@ private static void loadWritersFromRecords(
loadedWriters.values().forEach(writerStateManager::update);
}

private static void deleteWriterSnapshots(
public static void deleteWriterSnapshots(
List<LogSegment> segments, WriterStateManager writerStateManager) throws IOException {
for (LogSegment segment : segments) {
writerStateManager.removeAndDeleteSnapshot(segment.getBaseOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public WriterStateManager(TableBucket tableBucket, File logTabletDir, int writer
this.snapshots = loadSnapshots();
}

public int writerExpirationMs() {
return writerExpirationMs;
}

public int writerIdCount() {
return writerIdCount;
}
Expand Down
Loading
Loading