Skip to content

Commit

Permalink
[iceberg] Introduce feature to migrate table from iceberg to paimon (#…
Browse files Browse the repository at this point in the history
…4639)

This closes #4639.
  • Loading branch information
LsomeYeah authored Jan 9, 2025
1 parent 24c703a commit efe2841
Show file tree
Hide file tree
Showing 14 changed files with 1,404 additions and 8 deletions.
7 changes: 7 additions & 0 deletions paimon-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,13 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-parquet</artifactId>
<version>${iceberg.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public Path toMetadataPath(long snapshotId) {
return new Path(metadataDirectory(), String.format("v%d.metadata.json", snapshotId));
}

public Path toMetadataPath(String metadataName) {
return new Path(metadataDirectory(), metadataName);
}

public Stream<Path> getAllMetadataPathBefore(FileIO fileIO, long snapshotId)
throws IOException {
return FileUtils.listVersionedFileStatus(fileIO, metadataDirectory, "v")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,32 @@ public IcebergManifestEntry fromRow(InternalRow row) {
row.getLong(3),
fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
}

public IcebergManifestEntry fromRow(InternalRow row, IcebergManifestFileMeta meta) {
IcebergManifestEntry.Status status = IcebergManifestEntry.Status.fromId(row.getInt(0));
long snapshotId = row.isNullAt(1) ? meta.addedSnapshotId() : row.getLong(1);
long sequenceNumber = getOrInherit(row, meta, 2, status);
long fileSequenceNumber = getOrInherit(row, meta, 3, status);

return new IcebergManifestEntry(
status,
snapshotId,
sequenceNumber,
fileSequenceNumber,
fileSerializer.fromRow(row.getRow(4, fileSerializer.numFields())));
}

private long getOrInherit(
InternalRow row,
IcebergManifestFileMeta meta,
int pos,
IcebergManifestEntry.Status status) {
long sequenceNumber = meta.sequenceNumber();
if (row.isNullAt(pos)
&& (sequenceNumber == 0 || status == IcebergManifestEntry.Status.ADDED)) {
return sequenceNumber;
} else {
return row.getLong(pos);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.iceberg.manifest;

import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriterFactory;
Expand All @@ -38,9 +39,14 @@
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.Filter;
import org.apache.paimon.utils.ObjectsFile;
import org.apache.paimon.utils.PathFactory;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -109,6 +115,50 @@ public static IcebergManifestFile create(FileStoreTable table, IcebergPathFactor
table.coreOptions().manifestTargetSize());
}

public List<IcebergManifestEntry> read(IcebergManifestFileMeta meta) {
return read(meta, null);
}

public List<IcebergManifestEntry> read(IcebergManifestFileMeta meta, @Nullable Long fileSize) {
String fileName = new Path(meta.manifestPath()).getName();
try {
Path path = pathFactory.toPath(fileName);

return readFromIterator(
meta,
createIterator(path, fileSize),
(IcebergManifestEntrySerializer) serializer,
Filter.alwaysTrue());
} catch (IOException e) {
throw new RuntimeException("Failed to read " + fileName, e);
}
}

private CloseableIterator<InternalRow> createIterator(Path file, @Nullable Long fileSize)
throws IOException {
return FileUtils.createFormatReader(fileIO, readerFactory, file, fileSize)
.toCloseableIterator();
}

private static List<IcebergManifestEntry> readFromIterator(
IcebergManifestFileMeta meta,
CloseableIterator<InternalRow> inputIterator,
IcebergManifestEntrySerializer serializer,
Filter<InternalRow> readFilter) {
try (CloseableIterator<InternalRow> iterator = inputIterator) {
List<IcebergManifestEntry> result = new ArrayList<>();
while (iterator.hasNext()) {
InternalRow row = iterator.next();
if (readFilter.test(row)) {
result.add(serializer.fromRow(row, meta));
}
}
return result;
} catch (Exception e) {
throw new RuntimeException(e);
}
}

public List<IcebergManifestFileMeta> rollingWrite(
Iterator<IcebergManifestEntry> entries, long sequenceNumber) {
RollingFileWriter<IcebergManifestEntry, IcebergManifestFileMeta> writer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,22 @@

import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BinaryType;
import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DateType;
import org.apache.paimon.types.DecimalType;
import org.apache.paimon.types.DoubleType;
import org.apache.paimon.types.FloatType;
import org.apache.paimon.types.IntType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;
import org.apache.paimon.types.VarBinaryType;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.Preconditions;

import org.apache.paimon.shade.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
Expand Down Expand Up @@ -64,7 +73,7 @@ public class IcebergDataField {
@JsonProperty(FIELD_TYPE)
private final Object type;

@JsonIgnore private final DataType dataType;
@JsonIgnore private DataType dataType;

@JsonProperty(FIELD_DOC)
private final String doc;
Expand Down Expand Up @@ -126,6 +135,10 @@ public String doc() {

@JsonIgnore
public DataType dataType() {
if (dataType != null) {
return dataType;
}
dataType = getDataTypeFromType();
return Preconditions.checkNotNull(dataType);
}

Expand Down Expand Up @@ -190,6 +203,70 @@ private static Object toTypeObject(DataType dataType, int fieldId, int depth) {
}
}

private DataType getDataTypeFromType() {
String simpleType = type.toString();
String delimiter = "(";
if (simpleType.contains("[")) {
delimiter = "[";
}
String typePrefix =
!simpleType.contains(delimiter)
? simpleType
: simpleType.substring(0, simpleType.indexOf(delimiter));
switch (typePrefix) {
case "boolean":
return new BooleanType(!required);
case "int":
return new IntType(!required);
case "long":
return new BigIntType(!required);
case "float":
return new FloatType(!required);
case "double":
return new DoubleType(!required);
case "date":
return new DateType(!required);
case "string":
return new VarCharType(!required, VarCharType.MAX_LENGTH);
case "binary":
return new VarBinaryType(!required, VarBinaryType.MAX_LENGTH);
case "fixed":
int fixedLength =
Integer.parseInt(
simpleType.substring(
simpleType.indexOf("[") + 1, simpleType.indexOf("]")));
return new BinaryType(!required, fixedLength);
case "uuid":
// https://iceberg.apache.org/spec/?h=vector#primitive-types
// uuid should use 16-byte fixed
return new BinaryType(!required, 16);
case "decimal":
int precision =
Integer.parseInt(
simpleType.substring(
simpleType.indexOf("(") + 1, simpleType.indexOf(",")));
int scale =
Integer.parseInt(
simpleType.substring(
simpleType.indexOf(",") + 2, simpleType.indexOf(")")));
return new DecimalType(!required, precision, scale);
case "timestamp":
return new TimestampType(!required, 6);
case "timestamptz":
return new LocalZonedTimestampType(!required, 6);
case "timestamp_ns": // iceberg v3 format
return new TimestampType(!required, 9);
case "timestamptz_ns": // iceberg v3 format
return new LocalZonedTimestampType(!required, 9);
default:
throw new UnsupportedOperationException("Unsupported data type: " + type);
}
}

public DataField toDatafield() {
return new DataField(id, name, dataType(), doc);
}

@Override
public int hashCode() {
return Objects.hash(id, name, required, type, doc);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public class IcebergMetadata {
private final List<IcebergSnapshot> snapshots;

@JsonProperty(FIELD_CURRENT_SNAPSHOT_ID)
private final int currentSnapshotId;
private final long currentSnapshotId;

@JsonProperty(FIELD_PROPERTIES)
@Nullable
Expand All @@ -122,7 +122,7 @@ public IcebergMetadata(
List<IcebergPartitionSpec> partitionSpecs,
int lastPartitionId,
List<IcebergSnapshot> snapshots,
int currentSnapshotId) {
long currentSnapshotId) {
this(
CURRENT_FORMAT_VERSION,
tableUuid,
Expand Down Expand Up @@ -158,7 +158,7 @@ public IcebergMetadata(
@JsonProperty(FIELD_SORT_ORDERS) List<IcebergSortOrder> sortOrders,
@JsonProperty(FIELD_DEFAULT_SORT_ORDER_ID) int defaultSortOrderId,
@JsonProperty(FIELD_SNAPSHOTS) List<IcebergSnapshot> snapshots,
@JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) int currentSnapshotId,
@JsonProperty(FIELD_CURRENT_SNAPSHOT_ID) long currentSnapshotId,
@JsonProperty(FIELD_PROPERTIES) @Nullable Map<String, String> properties) {
this.formatVersion = formatVersion;
this.tableUuid = tableUuid;
Expand Down Expand Up @@ -249,7 +249,7 @@ public List<IcebergSnapshot> snapshots() {
}

@JsonGetter(FIELD_CURRENT_SNAPSHOT_ID)
public int currentSnapshotId() {
public long currentSnapshotId() {
return currentSnapshotId;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.iceberg.migrate;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.iceberg.IcebergPathFactory;
import org.apache.paimon.iceberg.metadata.IcebergMetadata;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;

/** Get iceberg table latest snapshot metadata in hadoop. */
public class IcebergMigrateHadoopMetadata implements IcebergMigrateMetadata {
private static final Logger LOG = LoggerFactory.getLogger(IcebergMigrateHadoopMetadata.class);

private static final String VERSION_HINT_FILENAME = "version-hint.text";
private static final String ICEBERG_WAREHOUSE = "iceberg_warehouse";

private final FileIO fileIO;
private final Identifier icebergIdentifier;
private final Options icebergOptions;

private Path icebergLatestMetaVersionPath;
private IcebergPathFactory icebergMetaPathFactory;

public IcebergMigrateHadoopMetadata(
Identifier icebergIdentifier, FileIO fileIO, Options icebergOptions) {
this.fileIO = fileIO;
this.icebergIdentifier = icebergIdentifier;
this.icebergOptions = icebergOptions;
}

@Override
public IcebergMetadata icebergMetadata() {
Preconditions.checkArgument(
icebergOptions.get(ICEBERG_WAREHOUSE) != null,
"'iceberg_warehouse' is null. "
+ "In hadoop-catalog, you should explicitly set this argument for finding iceberg metadata.");
this.icebergMetaPathFactory =
new IcebergPathFactory(
new Path(
icebergOptions.get(ICEBERG_WAREHOUSE),
new Path(
String.format(
"%s/%s/metadata",
icebergIdentifier.getDatabaseName(),
icebergIdentifier.getTableName()))));
long icebergLatestMetaVersion = getIcebergLatestMetaVersion();

this.icebergLatestMetaVersionPath =
icebergMetaPathFactory.toMetadataPath(icebergLatestMetaVersion);
LOG.info(
"iceberg latest snapshot metadata file location: {}", icebergLatestMetaVersionPath);

return IcebergMetadata.fromPath(fileIO, icebergLatestMetaVersionPath);
}

@Override
public String icebergLatestMetadataLocation() {
return icebergLatestMetaVersionPath.toString();
}

@Override
public void deleteOriginTable() {
Path tablePath = icebergMetaPathFactory.metadataDirectory().getParent();
LOG.info("Iceberg table path to be deleted:{}", tablePath);
try {
if (fileIO.isDir(tablePath)) {
fileIO.deleteDirectoryQuietly(tablePath);
}
} catch (IOException e) {
LOG.warn("exception occurred when deleting origin table.", e);
}
}

private long getIcebergLatestMetaVersion() {
Path versionHintPath =
new Path(icebergMetaPathFactory.metadataDirectory(), VERSION_HINT_FILENAME);
try {
return Integer.parseInt(fileIO.readFileUtf8(versionHintPath));
} catch (IOException e) {
throw new RuntimeException(
"read iceberg version-hint.text failed. Iceberg metadata path: "
+ icebergMetaPathFactory.metadataDirectory(),
e);
}
}
}
Loading

0 comments on commit efe2841

Please sign in to comment.