From 8e721371331873b72f2edd1ff757cc816dbaff4e Mon Sep 17 00:00:00 2001 From: BsoBird Date: Wed, 24 Jul 2024 10:09:15 +0800 Subject: [PATCH 1/4] Support multiple isolation level --- .../java/org/apache/paimon/CoreOptions.java | 14 + .../exception/CommitFailedException.java | 38 ++ .../CommitStateUnknownException.java | 41 ++ .../exception/DirtyCommitException.java | 41 ++ .../java/org/apache/paimon/fs/FileIO.java | 3 + paimon-core/pom.xml | 24 + .../org/apache/paimon/AbstractFileStore.java | 3 +- .../paimon/operation/FileStoreCommitImpl.java | 209 +++++-- .../apache/paimon/utils/BranchManager.java | 2 +- .../apache/paimon/utils/SnapshotManager.java | 34 +- .../java/org/apache/paimon/TestFileStore.java | 4 +- .../paimon/operation/FileStoreCommitTest.java | 513 ++++++++++++++++++ 12 files changed, 878 insertions(+), 48 deletions(-) create mode 100644 paimon-common/src/main/java/org/apache/paimon/exception/CommitFailedException.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/exception/CommitStateUnknownException.java create mode 100644 paimon-common/src/main/java/org/apache/paimon/exception/DirtyCommitException.java diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index b368b483a47f..4f7ba148ff2d 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1303,6 +1303,14 @@ public class CoreOptions implements Serializable { "The maximum number of concurrent deleting files. " + "By default is the number of processors available to the Java virtual machine."); + public static final ConfigOption COMMIT_ISOLATION_LEVEL = + key("commit.isolation-level") + .enumType(CommitIsolationLevel.class) + .defaultValue(CommitIsolationLevel.READ_COMMITTED) + .withDescription( + "Controls the isolation level of the commit operation, " + + "with a relaxed isolation level being used by default."); + private final Options options; public CoreOptions(Map options) { @@ -2568,6 +2576,12 @@ public enum RangeStrategy { QUANTITY } + /** Commit isolation level. */ + public enum CommitIsolationLevel { + READ_COMMITTED, + SERIALIZABLE + } + /** Specifies the log consistency mode for table. */ public enum ConsumerMode implements DescribedEnum { EXACTLY_ONCE( diff --git a/paimon-common/src/main/java/org/apache/paimon/exception/CommitFailedException.java b/paimon-common/src/main/java/org/apache/paimon/exception/CommitFailedException.java new file mode 100644 index 000000000000..9ad409fdc5f7 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/exception/CommitFailedException.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.exception; + +/** When this exception is thrown, the commit fails and some data cleanup can be performed. */ +public class CommitFailedException extends RuntimeException { + public CommitFailedException(String msg) { + super(msg); + } + + public CommitFailedException(Throwable e) { + super(e); + } + + public CommitFailedException(String msg, Throwable e) { + super(msg, e); + } + + public CommitFailedException(Throwable e, String msg, Object... args) { + super(String.format(msg, args), e); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/exception/CommitStateUnknownException.java b/paimon-common/src/main/java/org/apache/paimon/exception/CommitStateUnknownException.java new file mode 100644 index 000000000000..8ea4a9146955 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/exception/CommitStateUnknownException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.exception; + +/** + * When this exception is thrown, you should stop whatever you are doing. Don't clean/delete + * anything.This is because we can't be sure that the commit was successful. + */ +public class CommitStateUnknownException extends RuntimeException { + public CommitStateUnknownException(String msg) { + super(msg); + } + + public CommitStateUnknownException(Throwable e) { + super(e); + } + + public CommitStateUnknownException(String msg, Throwable e) { + super(msg, e); + } + + public CommitStateUnknownException(Throwable e, String msg, Object... args) { + super(String.format(msg, args), e); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/exception/DirtyCommitException.java b/paimon-common/src/main/java/org/apache/paimon/exception/DirtyCommitException.java new file mode 100644 index 000000000000..3cfc20e68ef8 --- /dev/null +++ b/paimon-common/src/main/java/org/apache/paimon/exception/DirtyCommitException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.exception; + +/** + * This means that in the case of concurrent operations, a client has a dirty commit. In this case, + * we should clean up the commit and stop it. + */ +public class DirtyCommitException extends RuntimeException { + public DirtyCommitException(String msg) { + super(msg); + } + + public DirtyCommitException(Throwable e) { + super(e); + } + + public DirtyCommitException(String msg, Throwable e) { + super(msg, e); + } + + public DirtyCommitException(Throwable e, String msg, Object... args) { + super(String.format(msg, args), e); + } +} diff --git a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java index cd9022e232bf..69fe94dbecf2 100644 --- a/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java +++ b/paimon-common/src/main/java/org/apache/paimon/fs/FileIO.java @@ -238,6 +238,9 @@ default boolean tryToWriteAtomic(Path path, String content) throws IOException { try { writeFile(tmp, content, false); success = rename(tmp, path); + } catch (IOException e) { + // try check once + success = exists(path) && !exists(tmp); } finally { if (!success) { deleteQuietly(tmp); diff --git a/paimon-core/pom.xml b/paimon-core/pom.xml index a2591bc6b5f2..f2ed147d0018 100644 --- a/paimon-core/pom.xml +++ b/paimon-core/pom.xml @@ -204,6 +204,30 @@ under the License. test + + org.mockito + mockito-inline + ${mockito.version} + jar + test + + + + org.mockito + mockito-core + ${mockito.version} + jar + test + + + + org.mockito + mockito-junit-jupiter + ${mockito.version} + jar + test + + diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index e33463d577c7..fcd1c1fdf392 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -205,7 +205,8 @@ public FileStoreCommitImpl newCommit(String commitUser) { options.branch(), newStatsFileHandler(), bucketMode(), - options.scanManifestParallelism()); + options.scanManifestParallelism(), + schema.options()); } @Override diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 4d29bc188746..64eaec3a47a2 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -18,10 +18,14 @@ package org.apache.paimon.operation; +import org.apache.paimon.CoreOptions; import org.apache.paimon.Snapshot; import org.apache.paimon.annotation.VisibleForTesting; import org.apache.paimon.data.BinaryRow; import org.apache.paimon.data.InternalRow; +import org.apache.paimon.exception.CommitFailedException; +import org.apache.paimon.exception.CommitStateUnknownException; +import org.apache.paimon.exception.DirtyCommitException; import org.apache.paimon.fs.FileIO; import org.apache.paimon.fs.Path; import org.apache.paimon.io.DataFileMeta; @@ -52,12 +56,14 @@ import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; +import org.apache.paimon.utils.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -65,6 +71,7 @@ import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Objects; @@ -73,6 +80,8 @@ import java.util.concurrent.Callable; import java.util.stream.Collectors; +import static org.apache.paimon.CoreOptions.COMMIT_ISOLATION_LEVEL; +import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX; import static org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETION_VECTORS_INDEX; import static org.apache.paimon.index.HashIndexFile.HASH_INDEX; import static org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate; @@ -130,6 +139,7 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final StatsFileHandler statsFileHandler; private final BucketMode bucketMode; + private final Map tableOptions; public FileStoreCommitImpl( FileIO fileIO, @@ -152,7 +162,8 @@ public FileStoreCommitImpl( String branchName, StatsFileHandler statsFileHandler, BucketMode bucketMode, - @Nullable Integer manifestReadParallelism) { + @Nullable Integer manifestReadParallelism, + Map tableOptions) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.commitUser = commitUser; @@ -178,6 +189,7 @@ public FileStoreCommitImpl( this.commitMetrics = null; this.statsFileHandler = statsFileHandler; this.bucketMode = bucketMode; + this.tableOptions = tableOptions; } @Override @@ -934,18 +946,36 @@ public boolean tryCommitOnce( e); } - boolean success; + String commitIsolationLevel = + tableOptions.getOrDefault( + COMMIT_ISOLATION_LEVEL.key(), COMMIT_ISOLATION_LEVEL.defaultValue().name()); + boolean useSerializableIsolation = + Objects.equals( + commitIsolationLevel, CoreOptions.CommitIsolationLevel.SERIALIZABLE.name()); + LOG.info("Commit isolation level is [{}]", commitIsolationLevel); + boolean useLockService = lock != null; + boolean success = false; + String successMsg = + String.format( + "Successfully commit snapshot #%d (path %s) by user %s " + + "with identifier %s and kind %s.", + newSnapshotId, newSnapshotPath, commitUser, identifier, commitKind.name()); + String errorMsg = + String.format( + "Atomic commit failed for snapshot #%d (path %s) by user %s " + + "with identifier %s and kind %s. " + + "Clean up and try again.", + newSnapshotId, newSnapshotPath, commitUser, identifier, commitKind.name()); try { Callable callable = - () -> { - boolean committed = - fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson()); - if (committed) { - snapshotManager.commitLatestHint(newSnapshotId); - } - return committed; - }; - if (lock != null) { + () -> + doCommit( + newSnapshotId, + newSnapshotPath, + newSnapshot, + useSerializableIsolation, + useLockService); + if (useLockService) { success = lock.runWithLock( () -> @@ -956,46 +986,46 @@ public boolean tryCommitOnce( // case !fileIO.exists(newSnapshotPath) && callable.call()); } else { + if (useSerializableIsolation) { + checkBeforeCommit(newSnapshotId); + } success = callable.call(); + if (useSerializableIsolation && success) { + checkAfterCommit(newSnapshotId); + } } - } catch (Throwable e) { + } catch (CommitStateUnknownException e) { // exception when performing the atomic rename, // we cannot clean up because we can't determine the success - throw new RuntimeException( - String.format( - "Exception occurs when committing snapshot #%d (path %s) by user %s " - + "with identifier %s and kind %s. " - + "Cannot clean up because we can't determine the success.", - newSnapshotId, - newSnapshotPath, - commitUser, - identifier, - commitKind.name()), - e); + throw e; + } catch (DirtyCommitException e) { + // We need to clean up all the metadata information generated by this commit. + cleanUpTmpManifests( + previousChangesListName, + newChangesListName, + changelogListName, + newIndexManifest, + oldMetas, + newMetas, + changelogMetas); + throw e; + } catch (Exception e) { + // If the commit succeeds, we ignore all exceptions thrown by the successful commit; + // otherwise, we throw an exception. + if (!success && useSerializableIsolation) { + throw new CommitFailedException(e.getMessage(), e); + } } if (success) { if (LOG.isDebugEnabled()) { - LOG.debug( - String.format( - "Successfully commit snapshot #%d (path %s) by user %s " - + "with identifier %s and kind %s.", - newSnapshotId, - newSnapshotPath, - commitUser, - identifier, - commitKind.name())); + LOG.debug(successMsg); } return true; } // atomic rename fails, clean up and try again - LOG.warn( - String.format( - "Atomic commit failed for snapshot #%d (path %s) by user %s " - + "with identifier %s and kind %s. " - + "Clean up and try again.", - newSnapshotId, newSnapshotPath, commitUser, identifier, commitKind.name())); + LOG.warn(errorMsg); cleanUpTmpManifests( previousChangesListName, newChangesListName, @@ -1004,7 +1034,11 @@ public boolean tryCommitOnce( oldMetas, newMetas, changelogMetas); - return false; + if (useSerializableIsolation) { + throw new CommitFailedException(errorMsg); + } else { + return false; + } } @SafeVarargs @@ -1274,4 +1308,101 @@ static ConflictCheck noConflictCheck() { public static ConflictCheck mustConflictCheck() { return latestSnapshot -> true; } + + boolean doCommit( + long newSnapshotId, + Path newSnapshotPath, + Snapshot newSnapshot, + boolean useSerializableIsolation, + boolean useLock) { + try { + boolean committed = tryAtomicCommitNewSnapshot(newSnapshotPath, newSnapshot); + if (committed) { + if (useSerializableIsolation && !useLock) { + // If the useSerializableIsolation is turned on,And we not use lock service, + // we will remove all HINTs,as it may provide incorrect information. + snapshotManager.removeSnapshotEarliestHint(); + snapshotManager.removeSnapshotLatestHint(); + snapshotManager.removeChangeLogEarliestHint(); + snapshotManager.removeChangeLogLatestHint(); + } else { + snapshotManager.commitLatestHint(newSnapshotId); + } + } + return committed; + } catch (Exception e) { + throw new CommitStateUnknownException(e.getMessage(), e); + } + } + + boolean tryAtomicCommitNewSnapshot(Path newSnapshotPath, Snapshot newSnapshot) + throws IOException { + return fileIO.tryToWriteAtomic(newSnapshotPath, newSnapshot.toJson()); + } + + void checkBeforeCommit(long newSnapshotId) throws IOException { + long latestSnapshotIdBeforeCommit = + Optional.ofNullable(snapshotManager.latestSnapshotIdWithOutHint()) + .orElse(Snapshot.FIRST_SNAPSHOT_ID - 1); + boolean currentCommitIsLatest = + Objects.equals(newSnapshotId, latestSnapshotIdBeforeCommit + 1); + if (!currentCommitIsLatest) { + snapshotManager.removeSnapshotLatestHint(); + snapshotManager.removeSnapshotEarliestHint(); + throw new CommitFailedException( + String.format( + "Can't submit snapshotId [%s], because it is expected that snapshotId [%s] should be submitted now.", + newSnapshotId, latestSnapshotIdBeforeCommit + 1)); + } + } + + void checkAfterCommit(long newSnapshotId) throws IOException { + String maxSaveSnapshotsNumStr = tableOptions.get(SNAPSHOT_NUM_RETAINED_MAX.key()); + int maxSaveSnapshotNum = SNAPSHOT_NUM_RETAINED_MAX.defaultValue(); + if (!StringUtils.isBlank(maxSaveSnapshotsNumStr)) { + maxSaveSnapshotNum = Integer.parseInt(maxSaveSnapshotsNumStr); + } + long latestSnapshotIdAfterCommit = + Optional.ofNullable(snapshotManager.latestSnapshotIdWithOutHint()) + .orElse(Snapshot.FIRST_SNAPSHOT_ID); + long waterMark = + latestSnapshotIdAfterCommit > maxSaveSnapshotNum + ? latestSnapshotIdAfterCommit - maxSaveSnapshotNum + : -1; + fastFailIfDirtyCommit(newSnapshotId, waterMark, latestSnapshotIdAfterCommit); + List dirtyCommits = new ArrayList<>(); + Iterator iterator = snapshotManager.snapshots(); + while (iterator.hasNext()) { + Snapshot snapshot = iterator.next(); + long currentSnapshotId = snapshot.id(); + if (waterMark > currentSnapshotId) { + dirtyCommits.add(currentSnapshotId); + } + } + for (Long dirtyCommit : dirtyCommits) { + snapshotManager.removeSnapshot(dirtyCommit); + snapshotManager.removeChangeLog(dirtyCommit); + } + } + + private void fastFailIfDirtyCommit( + long newSnapshotId, long waterMark, long latestSnapshotIdAfterCommit) + throws IOException { + String errorMsg = + String.format( + "Reject commit snapshotId [%s] because it's much smaller than the current latest snapshotId[%s].It's high probably a dirty commit.", + newSnapshotId, latestSnapshotIdAfterCommit); + boolean isDirtyCommit = + newSnapshotId < waterMark && snapshotManager.snapshotExists(newSnapshotId); + if (isDirtyCommit) { + try { + snapshotManager.removeSnapshot(newSnapshotId); + snapshotManager.removeSnapshotLatestHint(); + snapshotManager.removeSnapshotEarliestHint(); + throw new DirtyCommitException(errorMsg); + } catch (IOException e) { + throw new DirtyCommitException(errorMsg, e); + } + } + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 6ff8d4c2a2e0..87bc37e3b657 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -284,7 +284,7 @@ public void fastForward(String branchName) { .collect(Collectors.toList()); // Delete latest snapshot hint - snapshotManager.deleteLatestHint(); + snapshotManager.removeSnapshotLatestHint(); fileIO.deleteFilesQuietly(deletePaths); fileIO.copyFiles( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index ca88259defef..ddd42fcb5c7f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -664,12 +664,6 @@ private Long findByListFiles(BinaryOperator reducer, Path dir, String pref return listVersionedFiles(fileIO, dir, prefix).reduce(reducer).orElse(null); } - public void deleteLatestHint() throws IOException { - Path snapshotDir = snapshotDirectory(); - Path hintFile = new Path(snapshotDir, LATEST); - fileIO.delete(hintFile, false); - } - public void commitLatestHint(long snapshotId) throws IOException { commitHint(snapshotId, LATEST, snapshotDirectory()); } @@ -690,4 +684,32 @@ private void commitHint(long snapshotId, String fileName, Path dir) throws IOExc Path hintFile = new Path(dir, fileName); fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId)); } + + public @Nullable Long latestSnapshotIdWithOutHint() throws IOException { + return findByListFiles(Math::max, snapshotDirectory(), SNAPSHOT_PREFIX); + } + + public boolean removeSnapshot(long snapshotId) throws IOException { + return fileIO.delete(snapshotPath(snapshotId), false); + } + + public boolean removeChangeLog(long snapshotId) throws IOException { + return fileIO.delete(longLivedChangelogPath(snapshotId), false); + } + + public boolean removeSnapshotLatestHint() throws IOException { + return fileIO.delete(new Path(snapshotDirectory(), LATEST), false); + } + + public boolean removeSnapshotEarliestHint() throws IOException { + return fileIO.delete(new Path(snapshotDirectory(), EARLIEST), false); + } + + public boolean removeChangeLogLatestHint() throws IOException { + return fileIO.delete(new Path(changelogDirectory(), LATEST), false); + } + + public boolean removeChangeLogEarliestHint() throws IOException { + return fileIO.delete(new Path(changelogDirectory(), EARLIEST), false); + } } diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java index db822d7be57a..3319025af18d 100644 --- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java +++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java @@ -363,7 +363,9 @@ public List commitDataImpl( List snapshots = new ArrayList<>(); for (long id = snapshotIdBeforeCommit + 1; id <= snapshotIdAfterCommit; id++) { - snapshots.add(snapshotManager.snapshot(id)); + if (snapshotManager.snapshotExists(id)) { + snapshots.add(snapshotManager.snapshot(id)); + } } return snapshots; } diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index a60554db2449..c02c7e907721 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -27,6 +27,9 @@ import org.apache.paimon.data.BinaryRow; import org.apache.paimon.deletionvectors.DeletionVector; import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer; +import org.apache.paimon.exception.CommitFailedException; +import org.apache.paimon.exception.CommitStateUnknownException; +import org.apache.paimon.exception.DirtyCommitException; import org.apache.paimon.fs.Path; import org.apache.paimon.fs.local.LocalFileIO; import org.apache.paimon.index.IndexFileHandler; @@ -52,6 +55,7 @@ import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TraceableFileIO; +import org.assertj.core.api.Assertions; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -73,7 +77,12 @@ import java.util.Optional; import java.util.Set; import java.util.UUID; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -83,6 +92,13 @@ import static org.apache.paimon.testutils.assertj.PaimonAssertions.anyCauseMatches; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyBoolean; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.spy; /** Tests for {@link FileStoreCommitImpl}. */ public class FileStoreCommitTest { @@ -898,6 +914,461 @@ public void testDVIndexFiles() throws Exception { assertThat(dvs.get("f2").isDeleted(3)).isTrue(); } + @Test + public void testExceptionBeforeDoCommit() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doThrow(new RuntimeException("Oops!")) + .when(spyCommit) + .checkBeforeCommit(anyLong()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + + assertThatThrownBy( + () -> + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + })) + .isInstanceOf(CommitFailedException.class) + .hasMessage("Oops!"); + } + + @Test + public void testExceptionAfterDoCommit() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + Snapshot snapshot = + store.commitData( + generateDataList(10), + gen::getPartition, + kv -> 0, + Collections.emptyMap()) + .get(0); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doThrow(new RuntimeException("Oops!")) + .when(spyCommit) + .checkAfterCommit(anyLong()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + assertThat(spyStore.snapshotManager().latestSnapshotId()).isEqualTo(snapshot.id() + 1); + } + + @Test + public void testExceptionWhenCommit() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doThrow(new RuntimeException("Oops!")) + .when(spyCommit) + .tryAtomicCommitNewSnapshot(any(), any()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + + assertThatThrownBy( + () -> + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + })) + .isInstanceOf(CommitStateUnknownException.class) + .hasMessage("Oops!"); + } + + @Test + public void testRejectCommitAlreadyExistsVersion() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + TestFileStore store1 = createStore(false); + CountDownLatch countDownLatch = new CountDownLatch(10); + CountDownLatch countDownLatch2 = new CountDownLatch(1); + AtomicReference unexpectedException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(16); + + executorService.execute( + () -> { + try { + countDownLatch2.await(); + for (int i = 0; i < 10; i++) { + store1.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + countDownLatch.countDown(); + } + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doAnswer( + p -> { + countDownLatch2.countDown(); + countDownLatch.await(); + return p.callRealMethod(); + }) + .when(spyCommit) + .doCommit(anyLong(), any(), any(), anyBoolean(), anyBoolean()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + + executorService.execute( + () -> { + try { + assertThatThrownBy( + () -> + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit( + committable, + Collections.emptyMap()); + })) + .isInstanceOf(CommitFailedException.class) + .hasMessageContaining("Clean up and try again."); + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + executorService.shutdown(); + if (executorService.awaitTermination(300, TimeUnit.SECONDS)) { + assertThat(unexpectedException.get()).isNull(); + } else { + throw new RuntimeException("can not termination"); + } + } + + @Test + public void testRejectTooOldCommit() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + TestFileStore store1 = createStoreWithIsolation(false); + CountDownLatch countDownLatch = new CountDownLatch(10); + CountDownLatch countDownLatch2 = new CountDownLatch(1); + AtomicReference unexpectedException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(16); + + executorService.execute( + () -> { + try { + countDownLatch2.await(); + for (int i = 0; i < 10; i++) { + store1.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + countDownLatch.countDown(); + } + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doAnswer( + p -> { + countDownLatch2.countDown(); + countDownLatch.await(); + return p.callRealMethod(); + }) + .when(spyCommit) + .checkBeforeCommit(anyLong()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + + executorService.execute( + () -> { + try { + assertThatThrownBy( + () -> + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit( + committable, + Collections.emptyMap()); + })) + .isInstanceOf(CommitFailedException.class) + .hasMessage( + "Can't submit snapshotId [1], because it is expected that snapshotId [11] should be submitted now."); + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + executorService.shutdown(); + if (executorService.awaitTermination(30, TimeUnit.SECONDS)) { + assertThat(unexpectedException.get()).isNull(); + } else { + throw new RuntimeException("can not termination"); + } + } + + @Test + public void testRejectDirtyCommit() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + TestFileStore store1 = createStoreWithIsolation(false); + CountDownLatch countDownLatch = new CountDownLatch(10); + CountDownLatch countDownLatch2 = new CountDownLatch(1); + AtomicReference unexpectedException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(16); + + executorService.execute( + () -> { + try { + countDownLatch2.await(); + for (int i = 0; i < 10; i++) { + store1.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + countDownLatch.countDown(); + } + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doAnswer( + p -> { + countDownLatch2.countDown(); + countDownLatch.await(); + return p.callRealMethod(); + }) + .when(spyCommit) + .doCommit(anyLong(), any(), any(), anyBoolean(), anyBoolean()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + + executorService.execute( + () -> { + try { + assertThatThrownBy( + () -> + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit( + committable, + Collections.emptyMap()); + })) + .isInstanceOf(DirtyCommitException.class) + .hasMessage( + "Reject commit snapshotId [1] because it's much smaller than the current latest snapshotId[10].It's high probably a dirty commit."); + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + executorService.shutdown(); + if (executorService.awaitTermination(30, TimeUnit.SECONDS)) { + assertThat(unexpectedException.get()).isNull(); + } else { + throw new RuntimeException("can not termination"); + } + } + + @Test + public void testCleanOldDirtyCommit() throws Exception { + TestFileStore store = createStoreWithIsolation(false); + TestFileStore store1 = createStoreWithIsolation(false); + CountDownLatch countDownLatch = new CountDownLatch(10); + CountDownLatch countDownLatch2 = new CountDownLatch(1); + AtomicReference unexpectedException = new AtomicReference<>(); + ExecutorService executorService = Executors.newFixedThreadPool(16); + + executorService.execute( + () -> { + try { + countDownLatch2.await(); + for (int i = 0; i < 10; i++) { + store1.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + countDownLatch.countDown(); + } + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + TestFileStore spyStore = spy(store); + doAnswer( + x -> { + FileStoreCommitImpl commitImpl = + (FileStoreCommitImpl) x.callRealMethod(); + FileStoreCommitImpl spyCommit = spy(commitImpl); + doNothing().when(spyCommit).checkAfterCommit(anyLong()); + doAnswer( + p -> { + countDownLatch2.countDown(); + countDownLatch.await(); + return p.callRealMethod(); + }) + .when(spyCommit) + .doCommit(anyLong(), any(), any(), anyBoolean(), anyBoolean()); + return spyCommit; + }) + .when(spyStore) + .newCommit(any()); + + executorService.execute( + () -> { + try { + spyStore.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + } catch (Throwable e) { + unexpectedException.set(e); + } + }); + executorService.shutdown(); + if (executorService.awaitTermination(30, TimeUnit.SECONDS)) { + assertThat(unexpectedException.get()).isNull(); + Assertions.assertThat(store1.snapshotManager().snapshotExists(1)).isTrue(); + store1.commitDataImpl( + Collections.emptyList(), + gen::getPartition, + kv -> 0, + false, + null, + null, + Collections.emptyList(), + (commit, committable) -> { + commit.ignoreEmptyCommit(false); + commit.commit(committable, Collections.emptyMap()); + }); + Assertions.assertThat(store1.snapshotManager().snapshotExists(1)).isFalse(); + } else { + throw new RuntimeException("can not termination"); + } + } + private TestFileStore createStore(boolean failing) throws Exception { return createStore(failing, 1); } @@ -973,4 +1444,46 @@ private void logData(Supplier> supplier, String name) { } LOG.debug("========== End of " + name + " =========="); } + + private TestFileStore createStoreWithIsolation(boolean failing) throws Exception { + return createStoreWithIsolation(failing, 1, CoreOptions.ChangelogProducer.NONE); + } + + private TestFileStore createStoreWithIsolation( + boolean failing, int numBucket, CoreOptions.ChangelogProducer changelogProducer) + throws Exception { + Map options = new HashMap<>(); + options.put( + CoreOptions.COMMIT_ISOLATION_LEVEL.key(), + CoreOptions.CommitIsolationLevel.SERIALIZABLE.name()); + options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "2"); + options.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3"); + String root = + failing + ? FailingFileIO.getFailingPath(failingName, tempDir.toString()) + : TraceableFileIO.SCHEME + "://" + tempDir.toString(); + Path path = new Path(tempDir.toUri()); + TableSchema tableSchema = + SchemaUtils.forceCommit( + new SchemaManager(new LocalFileIO(), path), + new Schema( + TestKeyValueGenerator.DEFAULT_ROW_TYPE.getFields(), + TestKeyValueGenerator.DEFAULT_PART_TYPE.getFieldNames(), + TestKeyValueGenerator.getPrimaryKeys( + TestKeyValueGenerator.GeneratorMode.MULTI_PARTITIONED), + options, + null)); + return new TestFileStore.Builder( + "avro", + root, + numBucket, + TestKeyValueGenerator.DEFAULT_PART_TYPE, + TestKeyValueGenerator.KEY_TYPE, + TestKeyValueGenerator.DEFAULT_ROW_TYPE, + TestKeyValueGenerator.TestKeyValueFieldsExtractor.EXTRACTOR, + DeduplicateMergeFunction.factory(), + tableSchema) + .changelogProducer(changelogProducer) + .build(); + } } From f157d6c2f517896fc6c138d7b3a7b66ca68b2d25 Mon Sep 17 00:00:00 2001 From: BsoBird Date: Wed, 24 Jul 2024 15:12:20 +0800 Subject: [PATCH 2/4] Support multiple isolation level --- .../org/apache/paimon/AbstractFileStore.java | 11 ++++- .../java/org/apache/paimon/FileStore.java | 3 ++ .../paimon/operation/FileStoreCommitImpl.java | 42 ++++++++++++++++--- .../paimon/privilege/PrivilegedFileStore.java | 7 ++++ .../apache/paimon/utils/BranchManager.java | 14 +++++++ 5 files changed, 70 insertions(+), 7 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java index fcd1c1fdf392..a9a0bf64fd38 100644 --- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java @@ -46,6 +46,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SegmentsCache; import org.apache.paimon.utils.SnapshotManager; @@ -206,7 +207,9 @@ public FileStoreCommitImpl newCommit(String commitUser) { newStatsFileHandler(), bucketMode(), options.scanManifestParallelism(), - schema.options()); + schema.options(), + newBranchManager(), + newTagManager()); } @Override @@ -241,6 +244,12 @@ public TagManager newTagManager() { return new TagManager(fileIO, options.path()); } + @Override + public BranchManager newBranchManager() { + return new BranchManager( + fileIO, options.path(), snapshotManager(), newTagManager(), schemaManager); + } + @Override public TagDeletion newTagDeletion() { return new TagDeletion( diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java b/paimon-core/src/main/java/org/apache/paimon/FileStore.java index e943d38cf5e1..06b0ff78a4cb 100644 --- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java @@ -36,6 +36,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -86,6 +87,8 @@ public interface FileStore extends Serializable { TagManager newTagManager(); + BranchManager newBranchManager(); + TagDeletion newTagDeletion(); @Nullable diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 64eaec3a47a2..6d6ef490772f 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -52,11 +52,13 @@ import org.apache.paimon.table.sink.CommitMessage; import org.apache.paimon.table.sink.CommitMessageImpl; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Preconditions; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.StringUtils; +import org.apache.paimon.utils.TagManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -140,6 +142,8 @@ public class FileStoreCommitImpl implements FileStoreCommit { private final BucketMode bucketMode; private final Map tableOptions; + private final BranchManager branchManager; + private final TagManager tagManager; public FileStoreCommitImpl( FileIO fileIO, @@ -163,7 +167,9 @@ public FileStoreCommitImpl( StatsFileHandler statsFileHandler, BucketMode bucketMode, @Nullable Integer manifestReadParallelism, - Map tableOptions) { + Map tableOptions, + BranchManager branchManager, + TagManager tagManager) { this.fileIO = fileIO; this.schemaManager = schemaManager; this.commitUser = commitUser; @@ -190,6 +196,8 @@ public FileStoreCommitImpl( this.statsFileHandler = statsFileHandler; this.bucketMode = bucketMode; this.tableOptions = tableOptions; + this.branchManager = branchManager; + this.tagManager = tagManager; } @Override @@ -1369,13 +1377,31 @@ void checkAfterCommit(long newSnapshotId) throws IOException { latestSnapshotIdAfterCommit > maxSaveSnapshotNum ? latestSnapshotIdAfterCommit - maxSaveSnapshotNum : -1; - fastFailIfDirtyCommit(newSnapshotId, waterMark, latestSnapshotIdAfterCommit); + Set excludeSnapshotIds = new HashSet<>(); + branchManager + .branchSnapshots() + .forEach( + x -> { + if (x != null) { + excludeSnapshotIds.add(x.id()); + } + }); + tagManager + .taggedSnapshots() + .forEach( + x -> { + if (x != null) { + excludeSnapshotIds.add(x.id()); + } + }); + fastFailIfDirtyCommit( + newSnapshotId, waterMark, latestSnapshotIdAfterCommit, excludeSnapshotIds); List dirtyCommits = new ArrayList<>(); Iterator iterator = snapshotManager.snapshots(); while (iterator.hasNext()) { Snapshot snapshot = iterator.next(); long currentSnapshotId = snapshot.id(); - if (waterMark > currentSnapshotId) { + if (waterMark > currentSnapshotId && !excludeSnapshotIds.contains(currentSnapshotId)) { dirtyCommits.add(currentSnapshotId); } } @@ -1386,14 +1412,18 @@ void checkAfterCommit(long newSnapshotId) throws IOException { } private void fastFailIfDirtyCommit( - long newSnapshotId, long waterMark, long latestSnapshotIdAfterCommit) - throws IOException { + long newSnapshotId, + long waterMark, + long latestSnapshotIdAfterCommit, + Collection excludeSnapshotIds) { String errorMsg = String.format( "Reject commit snapshotId [%s] because it's much smaller than the current latest snapshotId[%s].It's high probably a dirty commit.", newSnapshotId, latestSnapshotIdAfterCommit); boolean isDirtyCommit = - newSnapshotId < waterMark && snapshotManager.snapshotExists(newSnapshotId); + newSnapshotId < waterMark + && snapshotManager.snapshotExists(newSnapshotId) + && !excludeSnapshotIds.contains(newSnapshotId); if (isDirtyCommit) { try { snapshotManager.removeSnapshot(newSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java index ca2ad04a232d..bf1b541bd831 100644 --- a/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java +++ b/paimon-core/src/main/java/org/apache/paimon/privilege/PrivilegedFileStore.java @@ -39,6 +39,7 @@ import org.apache.paimon.table.sink.TagCallback; import org.apache.paimon.tag.TagAutoManager; import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.BranchManager; import org.apache.paimon.utils.FileStorePathFactory; import org.apache.paimon.utils.SnapshotManager; import org.apache.paimon.utils.TagManager; @@ -157,6 +158,12 @@ public TagManager newTagManager() { return wrapped.newTagManager(); } + @Override + public BranchManager newBranchManager() { + privilegeChecker.assertCanInsert(identifier); + return wrapped.newBranchManager(); + } + @Override public TagDeletion newTagDeletion() { privilegeChecker.assertCanInsert(identifier); diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index 87bc37e3b657..7173d8f2b8a6 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -364,4 +364,18 @@ public List branches() { throw new RuntimeException(e); } } + + /** Get all branch createdFromSnapshots. */ + public List branchSnapshots() { + ArrayList snapshotList = new ArrayList<>(); + branches() + .forEach( + b -> { + long createdFromSnapshot = b.getCreatedFromSnapshot(); + if (snapshotManager.snapshotExists(createdFromSnapshot)) { + snapshotList.add(snapshotManager.snapshot(createdFromSnapshot)); + } + }); + return snapshotList; + } } From d1b7edbed0b60b302ddd5c77bc3e1fc7ef7127d0 Mon Sep 17 00:00:00 2001 From: BsoBird Date: Wed, 31 Jul 2024 17:09:32 +0800 Subject: [PATCH 3/4] Support multiple isolation level --- .../org/apache/paimon/operation/FileStoreCommitImpl.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 6d6ef490772f..3ca2b3ee9f7c 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -1021,6 +1021,14 @@ public boolean tryCommitOnce( // If the commit succeeds, we ignore all exceptions thrown by the successful commit; // otherwise, we throw an exception. if (!success && useSerializableIsolation) { + cleanUpTmpManifests( + previousChangesListName, + newChangesListName, + changelogListName, + newIndexManifest, + oldMetas, + newMetas, + changelogMetas); throw new CommitFailedException(e.getMessage(), e); } } From b6d36254538300420a2d6b8b3f84634d5a5bb90d Mon Sep 17 00:00:00 2001 From: BsoBird Date: Fri, 9 Aug 2024 16:04:45 +0800 Subject: [PATCH 4/4] change clean logic --- .../paimon/operation/FileStoreCommitImpl.java | 52 ++++++++----------- .../paimon/operation/FileStoreCommitTest.java | 6 +-- 2 files changed, 25 insertions(+), 33 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 3ca2b3ee9f7c..a2a6263df3a5 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -974,15 +974,14 @@ public boolean tryCommitOnce( + "with identifier %s and kind %s. " + "Clean up and try again.", newSnapshotId, newSnapshotPath, commitUser, identifier, commitKind.name()); + boolean noHint = useSerializableIsolation && !useLockService; try { + if (noHint) { + // we will remove all HINTs,as it may provide incorrect information. + cleanAllHint(); + } Callable callable = - () -> - doCommit( - newSnapshotId, - newSnapshotPath, - newSnapshot, - useSerializableIsolation, - useLockService); + () -> doCommit(newSnapshotId, newSnapshotPath, newSnapshot, noHint); if (useLockService) { success = lock.runWithLock( @@ -1325,30 +1324,27 @@ public static ConflictCheck mustConflictCheck() { return latestSnapshot -> true; } + void cleanAllHint() throws IOException { + snapshotManager.removeSnapshotEarliestHint(); + snapshotManager.removeSnapshotLatestHint(); + snapshotManager.removeChangeLogEarliestHint(); + snapshotManager.removeChangeLogLatestHint(); + } + boolean doCommit( - long newSnapshotId, - Path newSnapshotPath, - Snapshot newSnapshot, - boolean useSerializableIsolation, - boolean useLock) { + long newSnapshotId, Path newSnapshotPath, Snapshot newSnapshot, boolean noHint) { + boolean committed = false; try { - boolean committed = tryAtomicCommitNewSnapshot(newSnapshotPath, newSnapshot); - if (committed) { - if (useSerializableIsolation && !useLock) { - // If the useSerializableIsolation is turned on,And we not use lock service, - // we will remove all HINTs,as it may provide incorrect information. - snapshotManager.removeSnapshotEarliestHint(); - snapshotManager.removeSnapshotLatestHint(); - snapshotManager.removeChangeLogEarliestHint(); - snapshotManager.removeChangeLogLatestHint(); - } else { - snapshotManager.commitLatestHint(newSnapshotId); - } + committed = tryAtomicCommitNewSnapshot(newSnapshotPath, newSnapshot); + if (committed && !noHint) { + this.snapshotManager.commitLatestHint(newSnapshotId); } - return committed; } catch (Exception e) { - throw new CommitStateUnknownException(e.getMessage(), e); + if (!committed) { + throw new CommitStateUnknownException(e.getMessage(), e); + } } + return committed; } boolean tryAtomicCommitNewSnapshot(Path newSnapshotPath, Snapshot newSnapshot) @@ -1363,8 +1359,6 @@ void checkBeforeCommit(long newSnapshotId) throws IOException { boolean currentCommitIsLatest = Objects.equals(newSnapshotId, latestSnapshotIdBeforeCommit + 1); if (!currentCommitIsLatest) { - snapshotManager.removeSnapshotLatestHint(); - snapshotManager.removeSnapshotEarliestHint(); throw new CommitFailedException( String.format( "Can't submit snapshotId [%s], because it is expected that snapshotId [%s] should be submitted now.", @@ -1435,8 +1429,6 @@ private void fastFailIfDirtyCommit( if (isDirtyCommit) { try { snapshotManager.removeSnapshot(newSnapshotId); - snapshotManager.removeSnapshotLatestHint(); - snapshotManager.removeSnapshotEarliestHint(); throw new DirtyCommitException(errorMsg); } catch (IOException e) { throw new DirtyCommitException(errorMsg, e); diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java index c02c7e907721..5315ec3f1283 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/FileStoreCommitTest.java @@ -1067,7 +1067,7 @@ public void testRejectCommitAlreadyExistsVersion() throws Exception { return p.callRealMethod(); }) .when(spyCommit) - .doCommit(anyLong(), any(), any(), anyBoolean(), anyBoolean()); + .doCommit(anyLong(), any(), any(), anyBoolean()); return spyCommit; }) .when(spyStore) @@ -1236,7 +1236,7 @@ public void testRejectDirtyCommit() throws Exception { return p.callRealMethod(); }) .when(spyCommit) - .doCommit(anyLong(), any(), any(), anyBoolean(), anyBoolean()); + .doCommit(anyLong(), any(), any(), anyBoolean()); return spyCommit; }) .when(spyStore) @@ -1322,7 +1322,7 @@ public void testCleanOldDirtyCommit() throws Exception { return p.callRealMethod(); }) .when(spyCommit) - .doCommit(anyLong(), any(), any(), anyBoolean(), anyBoolean()); + .doCommit(anyLong(), any(), any(), anyBoolean()); return spyCommit; }) .when(spyStore)