From 41d5a142efb790d0bbe6c0ba977aa386467cfcd5 Mon Sep 17 00:00:00 2001 From: sunxiaojian Date: Sat, 16 Mar 2024 02:04:40 +0800 Subject: [PATCH] fixed --- .../org/apache/paimon/jdbc/JdbcCatalog.java | 2 +- .../paimon/operation/FileStoreCommitImpl.java | 2 +- .../apache/paimon/schema/SchemaManager.java | 91 +++++--- .../apache/paimon/utils/BranchManager.java | 4 +- .../apache/paimon/utils/SnapshotManager.java | 198 ++++++++++++------ .../org/apache/paimon/utils/TagManager.java | 12 +- .../paimon/table/FileStoreTableTestBase.java | 5 +- ...TableUpdatedDataFieldsProcessFunction.java | 4 +- .../paimon/flink/sink/FlinkTableSink.java | 3 +- .../sink/UnawareBucketCompactionSink.java | 3 +- 10 files changed, 216 insertions(+), 108 deletions(-) diff --git a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java index 3e3ee1bc9a8e..2db2b59448ce 100644 --- a/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java +++ b/paimon-core/src/main/java/org/apache/paimon/jdbc/JdbcCatalog.java @@ -327,7 +327,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE throw new TableNotExistException(identifier); } Path tableLocation = getDataTableLocation(identifier); - return new SchemaManager(fileIO, tableLocation,branchName) + return new SchemaManager(fileIO, tableLocation, branchName) .latest() .orElseThrow( () -> new RuntimeException("There is no paimon table in " + tableLocation)); diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java index 724c4e1a901b..93dd0cde8561 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java @@ -714,7 +714,7 @@ public boolean tryCommitOnce( Path newSnapshotPath = branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotManager.snapshotPath(newSnapshotId) - : snapshotManager.branchSnapshotPath(branchName, newSnapshotId); + : snapshotManager.snapshotPath(branchName, newSnapshotId); if (LOG.isDebugEnabled()) { LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId); diff --git a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java index 4d8fdf15da11..17fb9b625dba 100644 --- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java @@ -79,16 +79,16 @@ public class SchemaManager implements Serializable { private final FileIO fileIO; private final Path tableRoot; @Nullable private transient Lock lock; - private final String branchName; + private final String branch; public SchemaManager(FileIO fileIO, Path tableRoot) { this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH); } - public SchemaManager(FileIO fileIO, Path tableRoot, String branchName) { + public SchemaManager(FileIO fileIO, Path tableRoot, String branch) { this.fileIO = fileIO; this.tableRoot = tableRoot; - this.branchName = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName; + this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch; } public SchemaManager withLock(@Nullable Lock lock) { @@ -96,13 +96,13 @@ public SchemaManager withLock(@Nullable Lock lock) { return this; } - public String getBranchName() { - return branchName; + public Optional latest() { + return latest(branch); } - public Optional latest() { + public Optional latest(String branch) { try { - return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX) .reduce(Math::max) .map(this::schema); } catch (IOException e) { @@ -111,23 +111,38 @@ public Optional latest() { } public List listAll() { - return listAllIds().stream().map(this::schema).collect(Collectors.toList()); + return listAll(branch); + } + + public List listAll(String branch) { + return listAllIds(branch).stream() + .map(id -> schema(id, branch)) + .collect(Collectors.toList()); } - /** List all schema IDs. */ public List listAllIds() { + return listAllIds(branch); + } + + /** List all schema IDs. */ + public List listAllIds(String branch) { try { - return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX) + return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX) .collect(Collectors.toList()); } catch (IOException e) { throw new UncheckedIOException(e); } } - /** Create a new schema from {@link Schema}. */ public TableSchema createTable(Schema schema) throws Exception { + return createTable(branch, schema); + } + + /** Create a new schema from {@link Schema}. */ + public TableSchema createTable(String branch, Schema schema) throws Exception { while (true) { - latest().ifPresent( + latest(branch) + .ifPresent( latest -> { throw new IllegalStateException( "Schema in filesystem exists, please use updating," @@ -151,25 +166,30 @@ public TableSchema createTable(Schema schema) throws Exception { options, schema.comment()); - boolean success = commit(branchName, newSchema); + boolean success = commit(branch, newSchema); if (success) { return newSchema; } } } - /** Update {@link SchemaChange}s. */ public TableSchema commitChanges(SchemaChange... changes) throws Exception { - return commitChanges(Arrays.asList(changes)); + return commitChanges(branch, changes); } /** Update {@link SchemaChange}s. */ - public TableSchema commitChanges(List changes) + public TableSchema commitChanges(String branch, SchemaChange... changes) throws Exception { + return commitChanges(branch, Arrays.asList(changes)); + } + + /** Update {@link SchemaChange}s. */ + public TableSchema commitChanges(String branch, List changes) throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException, Catalog.ColumnNotExistException { while (true) { TableSchema schema = - latest().orElseThrow( + latest(branch) + .orElseThrow( () -> new Catalog.TableNotExistException( fromPath(tableRoot.toString(), true))); @@ -361,7 +381,7 @@ public TableSchema commitChanges(List changes) newComment); try { - boolean success = commit(branchName, newSchema); + boolean success = commit(branch, newSchema); if (success) { return newSchema; } @@ -372,8 +392,13 @@ public TableSchema commitChanges(List changes) } public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { + return mergeSchema(branch, rowType, allowExplicitCast); + } + + public boolean mergeSchema(String branch, RowType rowType, boolean allowExplicitCast) { TableSchema current = - latest().orElseThrow( + latest(branch) + .orElseThrow( () -> new RuntimeException( "It requires that the current schema to exist when calling 'mergeSchema'")); @@ -382,7 +407,7 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) { return false; } else { try { - return commit(update); + return commit(branch, update); } catch (Exception e) { throw new RuntimeException("Failed to commit the schema.", e); } @@ -455,13 +480,13 @@ private void updateColumn( @VisibleForTesting boolean commit(TableSchema newSchema) throws Exception { - return commit(DEFAULT_MAIN_BRANCH, newSchema); + return commit(branch, newSchema); } @VisibleForTesting boolean commit(String branchName, TableSchema newSchema) throws Exception { SchemaValidation.validateTableSchema(newSchema); - Path schemaPath = branchSchemaPath(branchName, newSchema.id()); + Path schemaPath = toSchemaPath(branchName, newSchema.id()); Callable callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString()); if (lock == null) { return callable.call(); @@ -469,10 +494,15 @@ boolean commit(String branchName, TableSchema newSchema) throws Exception { return lock.runWithLock(callable); } - /** Read schema for schema id. */ public TableSchema schema(long id) { + return schema(id, branch); + } + + /** Read schema for schema id. */ + public TableSchema schema(long id, String branch) { try { - return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class); + return JsonSerdeUtil.fromJson( + fileIO.readFileUtf8(toSchemaPath(branch, id)), TableSchema.class); } catch (IOException e) { throw new UncheckedIOException(e); } @@ -501,7 +531,7 @@ public Path toSchemaPath(long id) { return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id); } - public Path branchSchemaPath(String branchName, long schemaId) { + public Path toSchemaPath(String branchName, long schemaId) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? toSchemaPath(schemaId) : new Path( @@ -517,7 +547,16 @@ public Path branchSchemaPath(String branchName, long schemaId) { * @param schemaId the schema id to delete. */ public void deleteSchema(long schemaId) { - fileIO.deleteQuietly(branchSchemaPath(branchName, schemaId)); + deleteSchema(branch, schemaId); + } + + /** + * Delete schema with specific id. + * + * @param schemaId the schema id to delete. + */ + public void deleteSchema(String branch, long schemaId) { + fileIO.deleteQuietly(toSchemaPath(branch, schemaId)); } public static void checkAlterTableOption(String key) { diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java index c28907e4e349..e7209d11c96e 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/BranchManager.java @@ -102,10 +102,10 @@ public void createBranch(String branchName, String tagName) { tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName)); fileIO.copyFileUtf8( snapshotManager.snapshotPath(snapshot.id()), - snapshotManager.branchSnapshotPath(branchName, snapshot.id())); + snapshotManager.snapshotPath(branchName, snapshot.id())); fileIO.copyFileUtf8( schemaManager.toSchemaPath(snapshot.schemaId()), - schemaManager.branchSchemaPath(branchName, snapshot.schemaId())); + schemaManager.toSchemaPath(branchName, snapshot.schemaId())); } catch (IOException e) { throw new RuntimeException( String.format( diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java index 5816ca902b7a..4476a7fd0490 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/SnapshotManager.java @@ -60,8 +60,7 @@ public class SnapshotManager implements Serializable { private final FileIO fileIO; private final Path tablePath; - - private final String branchName; + private final String branch; public SnapshotManager(FileIO fileIO, Path tablePath) { this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); @@ -70,7 +69,7 @@ public SnapshotManager(FileIO fileIO, Path tablePath) { public SnapshotManager(FileIO fileIO, Path tablePath, String branchName) { this.fileIO = fileIO; this.tablePath = tablePath; - this.branchName = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName; + this.branch = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName; } public FileIO fileIO() { @@ -81,8 +80,8 @@ public Path tablePath() { return tablePath; } - public String getBranchName() { - return branchName; + public String getBranch() { + return branch; } public Path snapshotDirectory() { @@ -93,34 +92,37 @@ public Path snapshotPath(long snapshotId) { return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); } - public Path snapshotDirectory(String branchName) { - return new Path(getBranchPath(tablePath, branchName) + "/snapshot"); - } - - public Path branchSnapshotPath(String branchName, long snapshotId) { - return new Path( - getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId); - } - - public Path snapshotPathByBranch(String branchName, long snapshotId) { + public Path snapshotPath(String branchName, long snapshotId) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotPath(snapshotId) - : branchSnapshotPath(branchName, snapshotId); + : new Path( + getBranchPath(tablePath, branchName) + + "/snapshot/" + + SNAPSHOT_PREFIX + + snapshotId); } - public Path snapshotDirByBranch(String branchName) { + public Path snapshotDirectory(String branchName) { return branchName.equals(DEFAULT_MAIN_BRANCH) ? snapshotDirectory() - : snapshotDirectory(branchName); + : new Path(getBranchPath(tablePath, branchName) + "/snapshot"); } public Snapshot snapshot(long snapshotId) { - Path snapshotPath = snapshotPathByBranch(branchName, snapshotId); + return snapshot(snapshotId, branch); + } + + public Snapshot snapshot(long snapshotId, String branch) { + Path snapshotPath = snapshotPath(branch, snapshotId); return Snapshot.fromPath(fileIO, snapshotPath); } public boolean snapshotExists(long snapshotId) { - Path path = snapshotPathByBranch(branchName, snapshotId); + return snapshotExists(snapshotId, branch); + } + + public boolean snapshotExists(long snapshotId, String branch) { + Path path = snapshotPath(branch, snapshotId); try { return fileIO.exists(path); } catch (IOException e) { @@ -131,41 +133,61 @@ public boolean snapshotExists(long snapshotId) { } public @Nullable Snapshot latestSnapshot() { - Long snapshotId = latestSnapshotId(); + return latestSnapshot(branch); + } + + public @Nullable Snapshot latestSnapshot(String branch) { + Long snapshotId = latestSnapshotId(branch); return snapshotId == null ? null : snapshot(snapshotId); } public @Nullable Long latestSnapshotId() { + return latestSnapshotId(branch); + } + + public @Nullable Long latestSnapshotId(String branch) { try { - return findLatest(); + return findLatest(branch); } catch (IOException e) { throw new RuntimeException("Failed to find latest snapshot id", e); } } public @Nullable Snapshot earliestSnapshot() { - Long snapshotId = earliestSnapshotId(); - return snapshotId == null ? null : snapshot(snapshotId); + return earliestSnapshot(branch); + } + + public @Nullable Snapshot earliestSnapshot(String branch) { + Long snapshotId = earliestSnapshotId(branch); + return snapshotId == null ? null : snapshot(snapshotId, branch); } public @Nullable Long earliestSnapshotId() { + return earliestSnapshotId(branch); + } + + public @Nullable Long earliestSnapshotId(String branchName) { try { - return findEarliest(); + return findEarliest(branchName); } catch (IOException e) { throw new RuntimeException("Failed to find earliest snapshot id", e); } } public @Nullable Long pickOrLatest(Predicate predicate) { - Long latestId = latestSnapshotId(); - Long earliestId = earliestSnapshotId(); + return pickOrLatest(branch, predicate); + } + + public @Nullable Long pickOrLatest(String branch, Predicate predicate) { + Long latestId = latestSnapshotId(branch); + Long earliestId = earliestSnapshotId(branch); if (latestId == null || earliestId == null) { return null; } for (long snapshotId = latestId; snapshotId >= earliestId; snapshotId--) { - if (snapshotExists(snapshotId)) { - Snapshot snapshot = snapshot(snapshotId); + if (snapshotExists(snapshotId, branch)) { + Snapshot snapshot = snapshot(snapshotId, branch); if (predicate.test(snapshot)) { return snapshot.id(); } @@ -175,25 +197,28 @@ public boolean snapshotExists(long snapshotId) { return latestId; } + public @Nullable Long earlierThanTimeMills(long timestampMills) { + return earlierThanTimeMills(branch, timestampMills); + } /** * Returns the latest snapshot earlier than the timestamp mills. A non-existent snapshot may be * returned if all snapshots are equal to or later than the timestamp mills. */ - public @Nullable Long earlierThanTimeMills(long timestampMills) { - Long earliest = earliestSnapshotId(); - Long latest = latestSnapshotId(); + public @Nullable Long earlierThanTimeMills(String branch, long timestampMills) { + Long earliest = earliestSnapshotId(branch); + Long latest = latestSnapshotId(branch); if (earliest == null || latest == null) { return null; } - if (snapshot(earliest).timeMillis() >= timestampMills) { + if (snapshot(earliest, branch).timeMillis() >= timestampMills) { return earliest - 1; } while (earliest < latest) { long mid = (earliest + latest + 1) / 2; - if (snapshot(mid).timeMillis() < timestampMills) { + if (snapshot(mid, branch).timeMillis() < timestampMills) { earliest = mid; } else { latest = mid - 1; @@ -202,24 +227,28 @@ public boolean snapshotExists(long snapshotId) { return earliest; } + public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) { + return earlierOrEqualTimeMills(branch, timestampMills); + } + /** * Returns a {@link Snapshot} whoes commit time is earlier than or equal to given timestamp * mills. If there is no such a snapshot, returns null. */ - public @Nullable Snapshot earlierOrEqualTimeMills(long timestampMills) { - Long earliest = earliestSnapshotId(); - Long latest = latestSnapshotId(); + public @Nullable Snapshot earlierOrEqualTimeMills(String branch, long timestampMills) { + Long earliest = earliestSnapshotId(branch); + Long latest = latestSnapshotId(branch); if (earliest == null || latest == null) { return null; } - if (snapshot(earliest).timeMillis() > timestampMills) { + if (snapshot(earliest, branch).timeMillis() > timestampMills) { return null; } Snapshot finalSnapshot = null; while (earliest <= latest) { long mid = earliest + (latest - earliest) / 2; // Avoid overflow - Snapshot snapshot = snapshot(mid); + Snapshot snapshot = snapshot(mid, branch); long commitTime = snapshot.timeMillis(); if (commitTime > timestampMills) { latest = mid - 1; // Search in the left half @@ -235,23 +264,35 @@ public boolean snapshotExists(long snapshotId) { } public long snapshotCount() throws IOException { - return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX).count(); + return snapshotCount(branch); + } + + public long snapshotCount(String branch) throws IOException { + return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count(); } public Iterator snapshots() throws IOException { - return listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) + return snapshots(branch); + } + + public Iterator snapshots(String branch) throws IOException { + return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX) .map(this::snapshot) .sorted(Comparator.comparingLong(Snapshot::id)) .iterator(); } + public List safelyGetAllSnapshots() throws IOException { + return safelyGetAllSnapshots(branch); + } + /** * If {@link FileNotFoundException} is thrown when reading the snapshot file, this snapshot may * be deleted by other processes, so just skip this snapshot. */ - public List safelyGetAllSnapshots() throws IOException { + public List safelyGetAllSnapshots(String branch) throws IOException { List paths = - listVersionedFiles(fileIO, snapshotDirectory(), SNAPSHOT_PREFIX) + listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX) .map(this::snapshotPath) .collect(Collectors.toList()); @@ -263,13 +304,17 @@ public List safelyGetAllSnapshots() throws IOException { return snapshots; } + public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) { + return tryGetNonSnapshotFiles(branch, fileStatusFilter); + } /** * Try to get non snapshot files. If any error occurred, just ignore it and return an empty * result. */ - public List tryGetNonSnapshotFiles(Predicate fileStatusFilter) { + public List tryGetNonSnapshotFiles( + String branch, Predicate fileStatusFilter) { try { - FileStatus[] statuses = fileIO.listStatus(snapshotDirectory()); + FileStatus[] statuses = fileIO.listStatus(snapshotDirectory(branch)); if (statuses == null) { return Collections.emptyList(); } @@ -294,18 +339,22 @@ private Predicate nonSnapshotFileFilter() { } public Optional latestSnapshotOfUser(String user) { - Long latestId = latestSnapshotId(); + return latestSnapshotOfUser(branch, user); + } + + public Optional latestSnapshotOfUser(String branch, String user) { + Long latestId = latestSnapshotId(branch); if (latestId == null) { return Optional.empty(); } long earliestId = Preconditions.checkNotNull( - earliestSnapshotId(), + earliestSnapshotId(branch), "Latest snapshot id is not null, but earliest snapshot id is null. " + "This is unexpected."); for (long id = latestId; id >= earliestId; id--) { - Snapshot snapshot = snapshot(id); + Snapshot snapshot = snapshot(id, branch); if (user.equals(snapshot.commitUser())) { return Optional.of(snapshot); } @@ -313,19 +362,24 @@ public Optional latestSnapshotOfUser(String user) { return Optional.empty(); } - /** Find the snapshot of the specified identifiers written by the specified user. */ public List findSnapshotsForIdentifiers( @Nonnull String user, List identifiers) { + return findSnapshotsForIdentifiers(user, identifiers, branch); + } + + /** Find the snapshot of the specified identifiers written by the specified user. */ + public List findSnapshotsForIdentifiers( + @Nonnull String user, List identifiers, String branch) { if (identifiers.isEmpty()) { return Collections.emptyList(); } - Long latestId = latestSnapshotId(); + Long latestId = latestSnapshotId(branch); if (latestId == null) { return Collections.emptyList(); } long earliestId = Preconditions.checkNotNull( - earliestSnapshotId(), + earliestSnapshotId(branch), "Latest snapshot id is not null, but earliest snapshot id is null. " + "This is unexpected."); @@ -333,7 +387,7 @@ public List findSnapshotsForIdentifiers( List matchedSnapshots = new ArrayList<>(); Set remainingIdentifiers = new HashSet<>(identifiers); for (long id = latestId; id >= earliestId && !remainingIdentifiers.isEmpty(); id--) { - Snapshot snapshot = snapshot(id); + Snapshot snapshot = snapshot(id, branch); if (user.equals(snapshot.commitUser())) { if (remainingIdentifiers.remove(snapshot.commitIdentifier())) { matchedSnapshots.add(snapshot); @@ -346,18 +400,22 @@ public List findSnapshotsForIdentifiers( return matchedSnapshots; } + @Nullable + public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { + return traversalSnapshotsFromLatestSafely(checker, branch); + } /** * Traversal snapshots from latest to earliest safely, this is applied on the writer side * because the committer may delete obsolete snapshots, which may cause the writer to encounter * unreadable snapshots. */ @Nullable - public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { - Long latestId = latestSnapshotId(); + public Snapshot traversalSnapshotsFromLatestSafely(Filter checker, String branch) { + Long latestId = latestSnapshotId(branch); if (latestId == null) { return null; } - Long earliestId = earliestSnapshotId(); + Long earliestId = earliestSnapshotId(branch); if (earliestId == null) { return null; } @@ -365,9 +423,9 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { for (long id = latestId; id >= earliestId; id--) { Snapshot snapshot; try { - snapshot = snapshot(id); + snapshot = snapshot(id, branch); } catch (Exception e) { - Long newEarliestId = earliestSnapshotId(); + Long newEarliestId = earliestSnapshotId(branch); if (newEarliestId == null) { return null; } @@ -388,13 +446,13 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { return null; } - private @Nullable Long findLatest() throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + private @Nullable Long findLatest(String branch) throws IOException { + Path snapshotDir = snapshotDirectory(branch); if (!fileIO.exists(snapshotDir)) { return null; } - Long snapshotId = readHint(LATEST, branchName); + Long snapshotId = readHint(LATEST, branch); if (snapshotId != null) { long nextSnapshot = snapshotId + 1; // it is the latest only there is no next one @@ -407,12 +465,16 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } private @Nullable Long findEarliest() throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + return findEarliest(branch); + } + + private @Nullable Long findEarliest(String branch) throws IOException { + Path snapshotDir = snapshotDirectory(branch); if (!fileIO.exists(snapshotDir)) { return null; } - Long snapshotId = readHint(EARLIEST, branchName); + Long snapshotId = readHint(EARLIEST, branch); // null and it is the earliest only it exists if (snapshotId != null && snapshotExists(snapshotId)) { return snapshotId; @@ -422,11 +484,11 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter checker) { } public Long readHint(String fileName) { - return readHint(fileName, DEFAULT_MAIN_BRANCH); + return readHint(fileName, branch); } public Long readHint(String fileName, String branchName) { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path path = new Path(snapshotDir, fileName); int retryNumber = 0; while (retryNumber++ < READ_HINT_RETRY_NUM) { @@ -445,14 +507,14 @@ public Long readHint(String fileName, String branchName) { } private Long findByListFiles(BinaryOperator reducer) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branch); return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX) .reduce(reducer) .orElse(null); } public void commitLatestHint(long snapshotId) throws IOException { - commitLatestHint(snapshotId, DEFAULT_MAIN_BRANCH); + commitLatestHint(snapshotId, branch); } public void commitLatestHint(long snapshotId, String branchName) throws IOException { @@ -460,7 +522,7 @@ public void commitLatestHint(long snapshotId, String branchName) throws IOExcept } public void commitEarliestHint(long snapshotId) throws IOException { - commitEarliestHint(snapshotId, DEFAULT_MAIN_BRANCH); + commitEarliestHint(snapshotId, branch); } public void commitEarliestHint(long snapshotId, String branchName) throws IOException { @@ -469,7 +531,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce private void commitHint(long snapshotId, String fileName, String branchName) throws IOException { - Path snapshotDir = snapshotDirByBranch(branchName); + Path snapshotDir = snapshotDirectory(branchName); Path hintFile = new Path(snapshotDir, fileName); fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId)); } diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java index 0f697d746d83..7484d2dd3ce3 100644 --- a/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java +++ b/paimon-core/src/main/java/org/apache/paimon/utils/TagManager.java @@ -54,10 +54,16 @@ public class TagManager { private final FileIO fileIO; private final Path tablePath; + private final String branch; public TagManager(FileIO fileIO, Path tablePath) { + this(fileIO, tablePath, DEFAULT_MAIN_BRANCH); + } + + public TagManager(FileIO fileIO, Path tablePath, String branch) { this.fileIO = fileIO; this.tablePath = tablePath; + this.branch = branch; } /** Return the root Directory of tags. */ @@ -85,7 +91,7 @@ public Path tagPath(String branchName, String tagName) { } public void createTag(Snapshot snapshot, String tagName, List callbacks) { - createTag(snapshot, tagName, callbacks, DEFAULT_MAIN_BRANCH); + createTag(snapshot, tagName, callbacks, branch); } /** Create a tag from given snapshot and save it in the storage. */ @@ -139,7 +145,7 @@ public void deleteAllTagsOfOneSnapshot( public void deleteTag( String tagName, TagDeletion tagDeletion, SnapshotManager snapshotManager) { - deleteTag(tagName, tagDeletion, snapshotManager, DEFAULT_MAIN_BRANCH); + deleteTag(tagName, tagDeletion, snapshotManager, branch); } public void deleteTag( @@ -188,7 +194,7 @@ private void doClean( skippedSnapshots.add(taggedSnapshots.get(index - 1)); } // the nearest right neighbor - Snapshot right = snapshotManager.earliestSnapshot(); + Snapshot right = snapshotManager.earliestSnapshot(branch); if (index + 1 < taggedSnapshots.size()) { Snapshot rightTag = taggedSnapshots.get(index + 1); right = right.id() < rightTag.id() ? right : rightTag; diff --git a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java index 965baf99b8c4..03d6bc5ead59 100644 --- a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java +++ b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java @@ -956,15 +956,14 @@ public void testCreateBranch() throws Exception { SnapshotManager snapshotManager = new SnapshotManager(new TraceableFileIO(), tablePath); Snapshot branchSnapshot = Snapshot.fromPath( - new TraceableFileIO(), - snapshotManager.branchSnapshotPath("test-branch", 2)); + new TraceableFileIO(), snapshotManager.snapshotPath("test-branch", 2)); assertThat(branchSnapshot.equals(snapshot2)).isTrue(); // verify schema in test-branch is equal to schema 0 SchemaManager schemaManager = new SchemaManager(new TraceableFileIO(), tablePath); TableSchema branchSchema = SchemaManager.fromPath( - new TraceableFileIO(), schemaManager.branchSchemaPath("test-branch", 0)); + new TraceableFileIO(), schemaManager.toSchemaPath("test-branch", 0)); TableSchema schema0 = schemaManager.schema(0); assertThat(branchSchema.equals(schema0)).isTrue(); } diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java index 9c31fdf5d052..8d822dfe89dd 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/MultiTableUpdatedDataFieldsProcessFunction.java @@ -74,7 +74,9 @@ public void processElement( return null; } return new SchemaManager( - table.fileIO(), table.location(), CoreOptions.branch(table.options())); + table.fileIO(), + table.location(), + CoreOptions.branch(table.options())); }); if (Objects.isNull(schemaManager)) { diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java index 30eac425d3e0..1628daac588e 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/FlinkTableSink.java @@ -51,8 +51,7 @@ public void executeTruncation() { ((FileStoreTable) table) .store() .newCommit( - UUID.randomUUID().toString(), - CoreOptions.branch(table.options())); + UUID.randomUUID().toString(), CoreOptions.branch(table.options())); long identifier = BatchWriteBuilder.COMMIT_IDENTIFIER; commit.purgeTable(identifier); } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java index 56da8c56b584..8be3b937c0b8 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/UnawareBucketCompactionSink.java @@ -52,7 +52,8 @@ protected OneInputStreamOperator createWr protected Committer.Factory createCommitterFactory( boolean streamingCheckpointEnabled) { return (s, metricGroup) -> - new StoreCommitter(table.newCommit(s, CoreOptions.branch(table.options())), metricGroup); + new StoreCommitter( + table.newCommit(s, CoreOptions.branch(table.options())), metricGroup); } @Override