Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[iceberg] Introduce feature to migrate table from iceberg to paimon #4639

Merged
merged 30 commits into from
Jan 9, 2025

Conversation

LsomeYeah
Copy link
Contributor

@LsomeYeah LsomeYeah commented Dec 4, 2024

Purpose

Linked issue: close #xxx

Paimon has supported generating Iceberg compatible metadata, so that paimon tables can be consumed directly by Iceberg readers. Now paimon try to support an action or a procedure to support migrating iceberg table to paimon table.

The general implementation idea of this feature includes the following steps:

  1. read snapshot metadata of iceberg latest snapshot
  2. get all data files used by iceberg latest snapshot(existing of deleted kind file will cause exception or be filtered)
  3. migrate these data files to paimon

This pr supports basic ability to migrating iceberg table to paimon, including:

  1. core implementation of migrating, supported iceberg catalog type: hadoop-catalog
  2. UT cases

Procedure or action is not included in this pr.

Tests

org.apache.paimon.iceberg.IcebergMigrateTest

API and Format

Documentation

@JingsongLi
Copy link
Contributor

Please write [WIP] in PR title.

@JingsongLi JingsongLi changed the title [iceberg] Introduce procedure to migrate table from iceberg to paimon [WIP][iceberg] Introduce procedure to migrate table from iceberg to paimon Dec 4, 2024
@LsomeYeah LsomeYeah changed the title [WIP][iceberg] Introduce procedure to migrate table from iceberg to paimon [iceberg] Introduce feature to migrate table from iceberg to paimon Dec 9, 2024
this.paimonCatalog = paimonCatalog;
this.paimonFileIO = paimonCatalog.fileIO();
this.paimonDatabaseName = paimonDatabaseName;
this.paimonTableNameame = paimonTableNameame;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
this.paimonTableNameame = paimonTableNameame;
this.paimonTableName = paimonTableName;

Schema paimonSchema = icebergSchemaToPaimonSchema(icebergMetadata);
Identifier paimonIdentifier = Identifier.create(paimonDatabaseName, paimonTableNameame);

paimonCatalog.createDatabase(paimonDatabaseName, false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why false? User must migrate to a non-existing database?


private List<IcebergManifestFileMeta> checkAndFilterManifestFiles(
List<IcebergManifestFileMeta> icebergManifestFileMetas) {
if (!ignoreDelete) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need such an option? If there are deletion vectors in Iceberg, and user uses this option, then the resulting data will be incorrect. Incorrect data are useless to the users.

@@ -190,6 +199,70 @@ private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
}
}

public DataType getDataType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This class already has a dataType member. Check if dataType is null, if not just return that object, otherwise calculate data type from type string.

!simpleType.contains(delimiter)
? simpleType
: simpleType.substring(0, simpleType.indexOf(delimiter));
switch (typePrefix) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that you have calculated dataType with these code, store the result in dataType so we can directly use it next time.

Comment on lines 262 to 265
if (meta.content() == IcebergManifestFileMeta.Content.DELETES) {
throw new RuntimeException(
"IcebergMigrator don't support analyzing manifest file with 'DELETE' content.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Preconditions.checkArgument(meta.content() != IcebergManifestFileMeta.Content.DELETES)

Comment on lines 272 to 275
if (meta.content() != IcebergDataFileMeta.Content.DATA) {
throw new RuntimeException(
"IcebergMigrator don't support analyzing iceberg delete file.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Comment on lines 229 to 232
public void deleteOriginTable(boolean delete) throws Exception {}

@Override
public void renameTable(boolean ignoreIfNotExists) throws Exception {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you implement these?

Comment on lines 64 to 69
public static List<DataFileMeta> construct(
List<IcebergDataFileMeta> icebergDataFileMetas,
FileIO fileIO,
Table paimonTable,
Path newDir,
Map<Path, Path> rollback) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only related to Iceberg, while other methods in this utility class are quite versatile. Move this method to IcebergMigrator.

import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

/** Tests for {@link IcebergMigrator}. */
public class IcebergMigrateTest {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is quite a complex feature. I would add a random test.

import org.slf4j.LoggerFactory;

/** Get iceberg table latest snapshot metadata in hive. */
public class IcebergMigrateHiveMetadata implements IcebergMigrateMetadata {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you have tests for this class?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test for hive metadata is not included in this pr, I'll move it to next pr.

Comment on lines 206 to 218
String simpleType = type.toString();
String delimiter = "(";
if (simpleType.contains("[")) {
delimiter = "[";
}
String typePrefix =
!simpleType.contains(delimiter)
? simpleType
: simpleType.substring(0, simpleType.indexOf(delimiter));
switch (typePrefix) {
case "boolean":
dataType = new BooleanType(!required);
break;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Extract this into a separated method. This can remove all the breaks.


public IcebergManifestEntry fromRow(InternalRow row, IcebergManifestFileMeta meta) {
IcebergManifestEntry.Status status = IcebergManifestEntry.Status.fromId(row.getInt(0));
long snapshotId = !row.isNullAt(1) ? row.getLong(1) : meta.addedSnapshotId();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
long snapshotId = !row.isNullAt(1) ? row.getLong(1) : meta.addedSnapshotId();
long snapshotId = row.isNullAt(1) ? meta.addedSnapshotId() : row.getLong(1);

Comment on lines 94 to 96
LOG.warn(
"exception occurred when deleting origin table, exception message:{}",
e.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
LOG.warn(
"exception occurred when deleting origin table, exception message:{}",
e.getMessage());
LOG.warn(
"exception occurred when deleting origin table", e);

In this way the stack trace is also logged.

Comment on lines 105 to 109
} catch (IOException e) {
throw new RuntimeException(
"read iceberg version-hint.text failed. Iceberg metadata path: "
+ icebergMetaPathFactory.metadataDirectory());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add e as cause? It will be difficult to debug without stack trace.

Comment on lines 246 to 247
LOG.info("Last step: rename.");
LOG.info("Iceberg migrator do not rename table now.");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for this log. Also why Iceberg migrator ignore this method? Add comments.

}

@JsonIgnore
public DataType getDataTypeFromType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
public DataType getDataTypeFromType() {
private DataType getDataTypeFromType() {

@@ -190,6 +199,80 @@ private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
}
}

@JsonIgnore
public DataType getDataType() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replace the old dataType method with this one.

@tsreaper tsreaper merged commit efe2841 into apache:master Jan 9, 2025
12 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants