Skip to content

Commit

Permalink
fixed
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 15, 2024
1 parent 0a7d840 commit 41d5a14
Show file tree
Hide file tree
Showing 10 changed files with 216 additions and 108 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ protected TableSchema getDataTableSchema(Identifier identifier) throws TableNotE
throw new TableNotExistException(identifier);
}
Path tableLocation = getDataTableLocation(identifier);
return new SchemaManager(fileIO, tableLocation,branchName)
return new SchemaManager(fileIO, tableLocation, branchName)
.latest()
.orElseThrow(
() -> new RuntimeException("There is no paimon table in " + tableLocation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -714,7 +714,7 @@ public boolean tryCommitOnce(
Path newSnapshotPath =
branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotManager.snapshotPath(newSnapshotId)
: snapshotManager.branchSnapshotPath(branchName, newSnapshotId);
: snapshotManager.snapshotPath(branchName, newSnapshotId);

if (LOG.isDebugEnabled()) {
LOG.debug("Ready to commit table files to snapshot #" + newSnapshotId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,30 +79,30 @@ public class SchemaManager implements Serializable {
private final FileIO fileIO;
private final Path tableRoot;
@Nullable private transient Lock lock;
private final String branchName;
private final String branch;

public SchemaManager(FileIO fileIO, Path tableRoot) {
this(fileIO, tableRoot, DEFAULT_MAIN_BRANCH);
}

public SchemaManager(FileIO fileIO, Path tableRoot, String branchName) {
public SchemaManager(FileIO fileIO, Path tableRoot, String branch) {
this.fileIO = fileIO;
this.tableRoot = tableRoot;
this.branchName = StringUtils.isBlank(branchName) ? DEFAULT_MAIN_BRANCH : branchName;
this.branch = StringUtils.isBlank(branch) ? DEFAULT_MAIN_BRANCH : branch;
}

public SchemaManager withLock(@Nullable Lock lock) {
this.lock = lock;
return this;
}

public String getBranchName() {
return branchName;
public Optional<TableSchema> latest() {
return latest(branch);
}

public Optional<TableSchema> latest() {
public Optional<TableSchema> latest(String branch) {
try {
return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX)
return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX)
.reduce(Math::max)
.map(this::schema);
} catch (IOException e) {
Expand All @@ -111,23 +111,38 @@ public Optional<TableSchema> latest() {
}

public List<TableSchema> listAll() {
return listAllIds().stream().map(this::schema).collect(Collectors.toList());
return listAll(branch);
}

public List<TableSchema> listAll(String branch) {
return listAllIds(branch).stream()
.map(id -> schema(id, branch))
.collect(Collectors.toList());
}

/** List all schema IDs. */
public List<Long> listAllIds() {
return listAllIds(branch);
}

/** List all schema IDs. */
public List<Long> listAllIds(String branch) {
try {
return listVersionedFiles(fileIO, schemaDirectory(branchName), SCHEMA_PREFIX)
return listVersionedFiles(fileIO, schemaDirectory(branch), SCHEMA_PREFIX)
.collect(Collectors.toList());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}

/** Create a new schema from {@link Schema}. */
public TableSchema createTable(Schema schema) throws Exception {
return createTable(branch, schema);
}

/** Create a new schema from {@link Schema}. */
public TableSchema createTable(String branch, Schema schema) throws Exception {
while (true) {
latest().ifPresent(
latest(branch)
.ifPresent(
latest -> {
throw new IllegalStateException(
"Schema in filesystem exists, please use updating,"
Expand All @@ -151,25 +166,30 @@ public TableSchema createTable(Schema schema) throws Exception {
options,
schema.comment());

boolean success = commit(branchName, newSchema);
boolean success = commit(branch, newSchema);
if (success) {
return newSchema;
}
}
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(SchemaChange... changes) throws Exception {
return commitChanges(Arrays.asList(changes));
return commitChanges(branch, changes);
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(List<SchemaChange> changes)
public TableSchema commitChanges(String branch, SchemaChange... changes) throws Exception {
return commitChanges(branch, Arrays.asList(changes));
}

/** Update {@link SchemaChange}s. */
public TableSchema commitChanges(String branch, List<SchemaChange> changes)
throws Catalog.TableNotExistException, Catalog.ColumnAlreadyExistException,
Catalog.ColumnNotExistException {
while (true) {
TableSchema schema =
latest().orElseThrow(
latest(branch)
.orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(tableRoot.toString(), true)));
Expand Down Expand Up @@ -361,7 +381,7 @@ public TableSchema commitChanges(List<SchemaChange> changes)
newComment);

try {
boolean success = commit(branchName, newSchema);
boolean success = commit(branch, newSchema);
if (success) {
return newSchema;
}
Expand All @@ -372,8 +392,13 @@ public TableSchema commitChanges(List<SchemaChange> changes)
}

public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
return mergeSchema(branch, rowType, allowExplicitCast);
}

public boolean mergeSchema(String branch, RowType rowType, boolean allowExplicitCast) {
TableSchema current =
latest().orElseThrow(
latest(branch)
.orElseThrow(
() ->
new RuntimeException(
"It requires that the current schema to exist when calling 'mergeSchema'"));
Expand All @@ -382,7 +407,7 @@ public boolean mergeSchema(RowType rowType, boolean allowExplicitCast) {
return false;
} else {
try {
return commit(update);
return commit(branch, update);
} catch (Exception e) {
throw new RuntimeException("Failed to commit the schema.", e);
}
Expand Down Expand Up @@ -455,24 +480,29 @@ private void updateColumn(

@VisibleForTesting
boolean commit(TableSchema newSchema) throws Exception {
return commit(DEFAULT_MAIN_BRANCH, newSchema);
return commit(branch, newSchema);
}

@VisibleForTesting
boolean commit(String branchName, TableSchema newSchema) throws Exception {
SchemaValidation.validateTableSchema(newSchema);
Path schemaPath = branchSchemaPath(branchName, newSchema.id());
Path schemaPath = toSchemaPath(branchName, newSchema.id());
Callable<Boolean> callable = () -> fileIO.writeFileUtf8(schemaPath, newSchema.toString());
if (lock == null) {
return callable.call();
}
return lock.runWithLock(callable);
}

/** Read schema for schema id. */
public TableSchema schema(long id) {
return schema(id, branch);
}

/** Read schema for schema id. */
public TableSchema schema(long id, String branch) {
try {
return JsonSerdeUtil.fromJson(fileIO.readFileUtf8(toSchemaPath(id)), TableSchema.class);
return JsonSerdeUtil.fromJson(
fileIO.readFileUtf8(toSchemaPath(branch, id)), TableSchema.class);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
Expand Down Expand Up @@ -501,7 +531,7 @@ public Path toSchemaPath(long id) {
return new Path(tableRoot + "/schema/" + SCHEMA_PREFIX + id);
}

public Path branchSchemaPath(String branchName, long schemaId) {
public Path toSchemaPath(String branchName, long schemaId) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? toSchemaPath(schemaId)
: new Path(
Expand All @@ -517,7 +547,16 @@ public Path branchSchemaPath(String branchName, long schemaId) {
* @param schemaId the schema id to delete.
*/
public void deleteSchema(long schemaId) {
fileIO.deleteQuietly(branchSchemaPath(branchName, schemaId));
deleteSchema(branch, schemaId);
}

/**
* Delete schema with specific id.
*
* @param schemaId the schema id to delete.
*/
public void deleteSchema(String branch, long schemaId) {
fileIO.deleteQuietly(toSchemaPath(branch, schemaId));
}

public static void checkAlterTableOption(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,10 @@ public void createBranch(String branchName, String tagName) {
tagManager.tagPath(tagName), tagManager.tagPath(branchName, tagName));
fileIO.copyFileUtf8(
snapshotManager.snapshotPath(snapshot.id()),
snapshotManager.branchSnapshotPath(branchName, snapshot.id()));
snapshotManager.snapshotPath(branchName, snapshot.id()));
fileIO.copyFileUtf8(
schemaManager.toSchemaPath(snapshot.schemaId()),
schemaManager.branchSchemaPath(branchName, snapshot.schemaId()));
schemaManager.toSchemaPath(branchName, snapshot.schemaId()));
} catch (IOException e) {
throw new RuntimeException(
String.format(
Expand Down
Loading

0 comments on commit 41d5a14

Please sign in to comment.