diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java index fac372305b..706eca0abb 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogLoader.java @@ -19,6 +19,7 @@ import org.apache.fluss.config.ConfigOptions; import org.apache.fluss.config.Configuration; +import org.apache.fluss.exception.InvalidOffsetException; import org.apache.fluss.exception.LogSegmentOffsetOverflowException; import org.apache.fluss.exception.LogStorageException; import org.apache.fluss.metadata.LogFormat; @@ -31,8 +32,12 @@ import java.io.File; import java.io.IOException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import java.util.stream.Collectors; /* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache * Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for @@ -76,7 +81,7 @@ public LogLoader( * * @return the offsets of the Log successfully loaded from disk */ - public LoadedLogOffsets load() throws IOException { + public LoadedLogOffsets load() throws Exception { // load all the log and index files. logSegments.close(); logSegments.clear(); @@ -117,6 +122,37 @@ public LoadedLogOffsets load() throws IOException { nextOffset, activeSegment.getBaseOffset(), activeSegment.getSizeInBytes())); } + /** + * Just recovers the given segment, without adding it to the provided segments. + * + * @param segment Segment to recover + * @return The number of bytes truncated from the segment + * @throws LogSegmentOffsetOverflowException if the segment contains messages that cause index + * offset overflow + */ + private int recoverSegment(LogSegment segment) throws Exception { + WriterStateManager writerStateManager = + new WriterStateManager( + logSegments.getTableBucket(), + logTabletDir, + this.writerStateManager.writerExpirationMs()); + // TODO, Here, we use 0 as the logStartOffset passed into rebuildWriterState. The reason is + // that the current implementation of logStartOffset in Fluss is not yet fully refined, and + // there may be cases where logStartOffset is not updated. As a result, logStartOffset is + // not yet reliable. Once the issue with correctly updating logStartOffset is resolved in + // issue https://github.com/apache/fluss/issues/744, we can use logStartOffset here. + // Additionally, using 0 versus using logStartOffset does not affect correctness—they both + // can restore the complete WriterState. The only difference is that using logStartOffset + // can potentially skip over more segments. + LogTablet.rebuildWriterState( + writerStateManager, logSegments, 0, segment.getBaseOffset(), false); + int bytesTruncated = segment.recover(); + // once we have recovered the segment's data, take a snapshot to ensure that we won't + // need to reload the same segment again while recovering another segment. + writerStateManager.takeSnapshot(); + return bytesTruncated; + } + /** * Recover the log segments (if there was an unclean shutdown). Ensures there is at least one * active segment, and returns the updated recovery point and next offset after recovery. @@ -129,16 +165,106 @@ public LoadedLogOffsets load() throws IOException { * overflow */ private Tuple2 recoverLog() throws IOException { - // TODO truncate log to recover maybe unflush segments. + if (!isCleanShutdown) { + List unflushed = + logSegments.values(recoveryPointCheckpoint, Long.MAX_VALUE); + int numUnflushed = unflushed.size(); + Iterator unflushedIter = unflushed.iterator(); + boolean truncated = false; + int numFlushed = 1; + + while (unflushedIter.hasNext() && !truncated) { + LogSegment segment = unflushedIter.next(); + LOG.info( + "Recovering unflushed segment {}. {}/{} recovered for bucket {}", + segment.getBaseOffset(), + numFlushed, + numUnflushed, + logSegments.getTableBucket()); + + int truncatedBytes = -1; + try { + truncatedBytes = recoverSegment(segment); + } catch (Exception e) { + if (e instanceof InvalidOffsetException) { + long startOffset = segment.getBaseOffset(); + LOG.warn( + "Found invalid offset during recovery for bucket {}. Deleting the corrupt segment " + + "and creating an empty one with starting offset {}", + logSegments.getTableBucket(), + startOffset); + truncatedBytes = segment.truncateTo(startOffset); + } + } + + if (truncatedBytes > 0) { + // we had an invalid message, delete all remaining log + LOG.warn( + "Corruption found in segment {} for bucket {}, truncating to offset {}", + segment.getBaseOffset(), + logSegments.getTableBucket(), + segment.readNextOffset()); + removeAndDeleteSegments(unflushedIter); + truncated = true; + } else { + numFlushed += 1; + } + } + } + if (logSegments.isEmpty()) { + // TODO: use logStartOffset if issue https://github.com/apache/fluss/issues/744 ready logSegments.add(LogSegment.open(logTabletDir, 0L, conf, logFormat)); } long logEndOffset = logSegments.lastSegment().get().readNextOffset(); return Tuple2.of(recoveryPointCheckpoint, logEndOffset); } + /** + * This method deletes the given log segments and the associated writer snapshots. + * + *

This method does not need to convert IOException to {@link LogStorageException} because it + * is either called before all logs are loaded or the immediate caller will catch and handle + * IOException + * + * @param segmentsToDelete The log segments to schedule for deletion + */ + private void removeAndDeleteSegments(Iterator segmentsToDelete) { + if (segmentsToDelete.hasNext()) { + List toDelete = new ArrayList<>(); + segmentsToDelete.forEachRemaining(toDelete::add); + + LOG.info( + "Deleting segments for bucket {} as part of log recovery: {}", + logSegments.getTableBucket(), + toDelete.stream().map(LogSegment::toString).collect(Collectors.joining(","))); + toDelete.forEach(segment -> logSegments.remove(segment.getBaseOffset())); + + try { + LocalLog.deleteSegmentFiles( + toDelete, LocalLog.SegmentDeletionReason.LOG_TRUNCATION); + } catch (IOException e) { + LOG.error( + "Failed to delete truncated segments {} for bucket {}", + toDelete, + logSegments.getTableBucket(), + e); + } + + try { + LogTablet.deleteWriterSnapshots(toDelete, writerStateManager); + } catch (IOException e) { + LOG.error( + "Failed to delete truncated writer snapshots {} for bucket {}", + toDelete, + logSegments.getTableBucket(), + e); + } + } + } + /** Loads segments from disk into the provided segments. */ - private void loadSegmentFiles() throws IOException { + private void loadSegmentFiles() throws Exception { File[] sortedFiles = logTabletDir.listFiles(); if (sortedFiles != null) { Arrays.sort(sortedFiles, Comparator.comparing(File::getName)); @@ -155,8 +281,26 @@ private void loadSegmentFiles() throws IOException { } } else if (LocalLog.isLogFile(file)) { long baseOffset = FlussPaths.offsetFromFile(file); + boolean timeIndexFileNewlyCreated = + !FlussPaths.timeIndexFile(logTabletDir, baseOffset).exists(); LogSegment segment = LogSegment.open(logTabletDir, baseOffset, conf, true, 0, logFormat); + + try { + segment.sanityCheck(timeIndexFileNewlyCreated); + } catch (Exception e) { + if (e instanceof NoSuchFieldException) { + if (isCleanShutdown + || segment.getBaseOffset() < recoveryPointCheckpoint) { + LOG.error( + "Could not find offset index file corresponding to log file {} " + + "for bucket {}, recovering segment and rebuilding index files...", + logSegments.getTableBucket(), + segment.getFileLogRecords().file().getAbsoluteFile()); + } + recoverSegment(segment); + } + } logSegments.add(segment); } } diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java index 0a30d5f5f4..59e4fa074b 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogSegment.java @@ -172,6 +172,23 @@ public void resizeIndexes(int size) throws IOException { timeIndex().resize(size); } + public void sanityCheck(boolean timeIndexFileNewlyCreated) throws Exception { + if (lazyOffsetIndex.file().exists()) { + // Resize the time index file to 0 if it is newly created. + if (timeIndexFileNewlyCreated) { + timeIndex().resize(0); + } + // Sanity checks for time index and offset index are skipped because + // we will recover the segments above the recovery point in recoverLog() + // in any case so sanity checking them here is redundant. + } else { + throw new NoSuchFieldException( + "Offset index file " + + lazyOffsetIndex.file().getAbsolutePath() + + " does not exist."); + } + } + /** * The maximum timestamp we see so far. * diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java index b4c8f1c9a0..dbc16a09cf 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/LogTablet.java @@ -1271,7 +1271,7 @@ private static void loadWritersFromRecords( loadedWriters.values().forEach(writerStateManager::update); } - private static void deleteWriterSnapshots( + public static void deleteWriterSnapshots( List segments, WriterStateManager writerStateManager) throws IOException { for (LogSegment segment : segments) { writerStateManager.removeAndDeleteSnapshot(segment.getBaseOffset()); diff --git a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java index 4fa0baced1..30afb849a4 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/log/WriterStateManager.java @@ -99,6 +99,10 @@ public WriterStateManager(TableBucket tableBucket, File logTabletDir, int writer this.snapshots = loadSnapshots(); } + public int writerExpirationMs() { + return writerExpirationMs; + } + public int writerIdCount() { return writerIdCount; } diff --git a/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java new file mode 100644 index 0000000000..9805cfccbc --- /dev/null +++ b/fluss-server/src/test/java/org/apache/fluss/server/log/LogLoaderTest.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.server.log; + +import org.apache.fluss.compression.ArrowCompressionInfo; +import org.apache.fluss.config.ConfigOptions; +import org.apache.fluss.config.MemorySize; +import org.apache.fluss.metadata.LogFormat; +import org.apache.fluss.metadata.PhysicalTablePath; +import org.apache.fluss.record.ChangeType; +import org.apache.fluss.record.LogTestBase; +import org.apache.fluss.record.MemoryLogRecords; +import org.apache.fluss.server.exception.CorruptIndexException; +import org.apache.fluss.server.metrics.group.TestingMetricGroups; +import org.apache.fluss.utils.clock.Clock; +import org.apache.fluss.utils.clock.ManualClock; +import org.apache.fluss.utils.clock.SystemClock; +import org.apache.fluss.utils.concurrent.FlussScheduler; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.FileChannel; +import java.nio.file.StandardOpenOption; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.fluss.record.TestData.DATA1_ROW_TYPE; +import static org.apache.fluss.record.TestData.DATA1_TABLE_ID; +import static org.apache.fluss.record.TestData.DATA1_TABLE_PATH; +import static org.apache.fluss.record.TestData.DEFAULT_SCHEMA_ID; +import static org.apache.fluss.testutils.DataTestUtils.createBasicMemoryLogRecords; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +/** Test for {@link LogLoader}. */ +final class LogLoaderTest extends LogTestBase { + + private @TempDir File tempDir; + private FlussScheduler scheduler; + private File logDir; + private Clock clock; + + @BeforeEach + public void setup() throws Exception { + conf.set(ConfigOptions.LOG_SEGMENT_FILE_SIZE, MemorySize.parse("10kb")); + conf.set(ConfigOptions.LOG_INDEX_INTERVAL_SIZE, MemorySize.parse("1b")); + + logDir = + LogTestUtils.makeRandomLogTabletDir( + tempDir, + DATA1_TABLE_PATH.getDatabaseName(), + DATA1_TABLE_ID, + DATA1_TABLE_PATH.getTableName()); + + scheduler = new FlussScheduler(1); + scheduler.startup(); + + clock = new ManualClock(); + } + + // TODO: add more tests like Kafka LogLoaderTest + + @Test + void testCorruptIndexRebuild() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all the index files + List indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // corrupt all the index files + for (File indexFile : indexFiles) { + try (FileChannel fileChannel = + FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) { + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {0})); + } + } + } + + // test reopen the log without recovery, sanity check of index files should throw exception + logTablet = createLogTablet(true); + for (LogSegment segment : logTablet.logSegments()) { + if (segment.getBaseOffset() != logTablet.activeLogSegment().getBaseOffset()) { + assertThatThrownBy(segment.offsetIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessage( + String.format( + "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d", + segment.offsetIndex().file().getAbsolutePath(), + segment.offsetIndex().length(), + segment.offsetIndex().entrySize())); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } else { + // the offset index file of active segment will be resized, which case no corruption + // exception when doing sanity check + segment.offsetIndex().sanityCheck(); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } + } + logTablet.close(); + + // test reopen the log with recovery, sanity check of index files should no corruption + logTablet = createLogTablet(false); + for (LogSegment segment : logTablet.logSegments()) { + segment.offsetIndex().sanityCheck(); + segment.timeIndex().sanityCheck(); + } + assertThat(numRecords).isEqualTo(logTablet.localLogEndOffset()); + for (int i = 0; i < numRecords; i++) { + assertThat(i) + .isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10)); + } + logTablet.close(); + } + + @Test + void testCorruptIndexRebuildWithRecoveryPoint() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all the index files + long recoveryPoint = logTablet.localLogEndOffset() / 2; + List indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // corrupt all the index files + for (File indexFile : indexFiles) { + try (FileChannel fileChannel = + FileChannel.open(indexFile.toPath(), StandardOpenOption.APPEND)) { + for (int i = 0; i < 12; i++) { + fileChannel.write(ByteBuffer.wrap(new byte[] {0})); + } + } + } + + // test reopen the log with recovery point + logTablet = createLogTablet(false, recoveryPoint); + List logSegments = logTablet.logSegments(recoveryPoint, Long.MAX_VALUE); + assertThat(logSegments.size() < logTablet.logSegments().size()).isTrue(); + Set recoveredSegments = + logSegments.stream().map(LogSegment::getBaseOffset).collect(Collectors.toSet()); + for (LogSegment segment : logTablet.logSegments()) { + if (recoveredSegments.contains(segment.getBaseOffset())) { + segment.offsetIndex().sanityCheck(); + segment.timeIndex().sanityCheck(); + } else { + // the segments before recovery point will not be recovered, so sanity check should + // still throw corrupt exception + assertThatThrownBy(segment.offsetIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessage( + String.format( + "Index file %s is corrupt, found %d bytes which is neither positive nor a multiple of %d", + segment.offsetIndex().file().getAbsolutePath(), + segment.offsetIndex().length(), + segment.offsetIndex().entrySize())); + assertThatThrownBy(segment.timeIndex()::sanityCheck) + .isInstanceOf(CorruptIndexException.class) + .hasMessageContaining( + String.format( + "Corrupt time index found, time index file (%s) has non-zero size but the last timestamp is 0 which is less than the first timestamp", + segment.timeIndex().file().getAbsolutePath())); + } + } + } + + @Test + void testIndexRebuild() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + // collect all index files + List indexFiles = collectIndexFiles(logTablet.logSegments()); + logTablet.close(); + + // delete all the index files + indexFiles.forEach(File::delete); + + // reopen the log + logTablet = createLogTablet(false); + assertThat(logTablet.localLogEndOffset()).isEqualTo(numRecords); + // the index files should be rebuilt + assertThat(logTablet.logSegments().get(0).offsetIndex().entries() > 0).isTrue(); + assertThat(logTablet.logSegments().get(0).timeIndex().entries() > 0).isTrue(); + for (int i = 0; i < numRecords; i++) { + assertThat(i) + .isEqualTo(logTablet.lookupOffsetForTimestamp(clock.milliseconds() + i * 10)); + } + logTablet.close(); + } + + @Test + void testInvalidOffsetRebuild() throws Exception { + // publish the records and close the log + int numRecords = 200; + LogTablet logTablet = createLogTablet(true); + appendRecords(logTablet, numRecords); + + List logSegments = logTablet.logSegments(); + int corruptSegmentIndex = logSegments.size() / 2; + assertThat(corruptSegmentIndex < logSegments.size()).isTrue(); + LogSegment corruptSegment = logSegments.get(corruptSegmentIndex); + + // append an invalid offset batch + List objects = Collections.singletonList(new Object[] {1, "a"}); + List changeTypes = + objects.stream().map(row -> ChangeType.APPEND_ONLY).collect(Collectors.toList()); + MemoryLogRecords memoryLogRecords = + createBasicMemoryLogRecords( + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + corruptSegment.getBaseOffset(), + clock.milliseconds(), + magic, + System.currentTimeMillis(), + 0, + changeTypes, + objects, + LogFormat.ARROW, + ArrowCompressionInfo.DEFAULT_COMPRESSION); + corruptSegment.getFileLogRecords().append(memoryLogRecords); + logTablet.close(); + + logTablet = createLogTablet(false); + // the corrupt segment should be truncated to base offset + assertThat(logTablet.localLogEndOffset()).isEqualTo(corruptSegment.getBaseOffset()); + // segments after the corrupt segment should be removed + assertThat(logTablet.logSegments().size()).isEqualTo(corruptSegmentIndex + 1); + } + + private LogTablet createLogTablet(boolean isCleanShutdown) throws Exception { + return createLogTablet(isCleanShutdown, 0); + } + + private LogTablet createLogTablet(boolean isCleanShutdown, long recoveryPoint) + throws Exception { + return LogTablet.create( + PhysicalTablePath.of(DATA1_TABLE_PATH), + logDir, + conf, + TestingMetricGroups.TABLET_SERVER_METRICS, + recoveryPoint, + scheduler, + LogFormat.ARROW, + 1, + false, + SystemClock.getInstance(), + isCleanShutdown); + } + + private void appendRecords(LogTablet logTablet, int numRecords) throws Exception { + int baseOffset = 0; + int batchSequence = 0; + for (int i = 0; i < numRecords; i++) { + List objects = Collections.singletonList(new Object[] {1, "a"}); + List changeTypes = + objects.stream() + .map(row -> ChangeType.APPEND_ONLY) + .collect(Collectors.toList()); + MemoryLogRecords memoryLogRecords = + createBasicMemoryLogRecords( + DATA1_ROW_TYPE, + DEFAULT_SCHEMA_ID, + baseOffset, + clock.milliseconds() + i * 10L, + magic, + System.currentTimeMillis(), + batchSequence, + changeTypes, + objects, + LogFormat.ARROW, + ArrowCompressionInfo.DEFAULT_COMPRESSION); + logTablet.appendAsFollower(memoryLogRecords); + baseOffset++; + batchSequence++; + } + } + + private List collectIndexFiles(List logSegments) throws IOException { + List indexFiles = new ArrayList<>(); + for (LogSegment segment : logSegments) { + indexFiles.add(segment.offsetIndex().file()); + indexFiles.add(segment.timeIndex().file()); + } + return indexFiles; + } +}