Skip to content

Commit

Permalink
Introduces the new IcebergSink based on the new V2 Flink Sink Abstrac…
Browse files Browse the repository at this point in the history
…tion

Co-authored-by: Liwei Li <[email protected]>
Co-authored-by: Kyle Bendickson <[email protected]>
Co-authored-by: Peter Vary <[email protected]>
  • Loading branch information
rodmeneses committed Aug 16, 2024
1 parent e955310 commit 00848b7
Show file tree
Hide file tree
Showing 7 changed files with 29 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void write(RowData row) throws IOException {
}

protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter {
public RowDataDeltaWriter(PartitionKey partition) {
RowDataDeltaWriter(PartitionKey partition) {
super(partition, schema, deleteSchema, DeleteGranularity.FILE);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,10 @@
import java.util.Arrays;
import java.util.NavigableMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.io.WriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;

@Internal
public class CommitSummary {
class CommitSummary {

private final AtomicLong dataFilesCount = new AtomicLong();
private final AtomicLong dataFilesRecordCount = new AtomicLong();
Expand All @@ -35,7 +33,7 @@ public class CommitSummary {
private final AtomicLong deleteFilesRecordCount = new AtomicLong();
private final AtomicLong deleteFilesByteCount = new AtomicLong();

public CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
pendingResults
.values()
.forEach(
Expand All @@ -57,27 +55,27 @@ public CommitSummary(NavigableMap<Long, WriteResult> pendingResults) {
});
}

public long dataFilesCount() {
long dataFilesCount() {
return dataFilesCount.get();
}

public long dataFilesRecordCount() {
long dataFilesRecordCount() {
return dataFilesRecordCount.get();
}

public long dataFilesByteCount() {
long dataFilesByteCount() {
return dataFilesByteCount.get();
}

public long deleteFilesCount() {
long deleteFilesCount() {
return deleteFilesCount.get();
}

public long deleteFilesRecordCount() {
long deleteFilesRecordCount() {
return deleteFilesRecordCount.get();
}

public long deleteFilesByteCount() {
long deleteFilesByteCount() {
return deleteFilesByteCount.get();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,23 @@
package org.apache.iceberg.flink.sink;

import java.util.List;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;

@Internal
public class DeltaManifests {
class DeltaManifests {

private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0];

private final ManifestFile dataManifest;
private final ManifestFile deleteManifest;
private final CharSequence[] referencedDataFiles;

public DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) {
this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES);
}

public DeltaManifests(
DeltaManifests(
ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) {
Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null.");

Expand All @@ -46,19 +44,19 @@ public DeltaManifests(
this.referencedDataFiles = referencedDataFiles;
}

public ManifestFile dataManifest() {
ManifestFile dataManifest() {
return dataManifest;
}

public ManifestFile deleteManifest() {
ManifestFile deleteManifest() {
return deleteManifest;
}

public CharSequence[] referencedDataFiles() {
CharSequence[] referencedDataFiles() {
return referencedDataFiles;
}

public List<ManifestFile> manifests() {
List<ManifestFile> manifests() {
List<ManifestFile> manifests = Lists.newArrayListWithCapacity(2);
if (dataManifest != null) {
manifests.add(dataManifest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,12 @@
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.flink.annotation.Internal;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.ManifestFiles;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

@Internal
public class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
class DeltaManifestsSerializer implements SimpleVersionedSerializer<DeltaManifests> {
private static final int VERSION_1 = 1;
private static final int VERSION_2 = 2;
private static final byte[] EMPTY_BINARY = new byte[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.flink.annotation.Internal;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.ManifestFile;
Expand All @@ -39,15 +38,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class FlinkManifestUtil {
class FlinkManifestUtil {
private static final int FORMAT_V2 = 2;
private static final Long DUMMY_SNAPSHOT_ID = 0L;
private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class);

private FlinkManifestUtil() {}

public static ManifestFile writeDataFiles(
static ManifestFile writeDataFiles(
OutputFile outputFile, PartitionSpec spec, List<DataFile> dataFiles) throws IOException {
ManifestWriter<DataFile> writer =
ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID);
Expand All @@ -59,15 +57,15 @@ public static ManifestFile writeDataFiles(
return writer.toManifestFile();
}

public static List<DataFile> readDataFiles(
static List<DataFile> readDataFiles(
ManifestFile manifestFile, FileIO io, Map<Integer, PartitionSpec> specsById)
throws IOException {
try (CloseableIterable<DataFile> dataFiles = ManifestFiles.read(manifestFile, io, specsById)) {
return Lists.newArrayList(dataFiles);
}
}

public static ManifestOutputFileFactory createOutputFileFactory(
static ManifestOutputFileFactory createOutputFileFactory(
Supplier<Table> tableSupplier,
Map<String, String> tableProps,
String flinkJobId,
Expand All @@ -84,7 +82,7 @@ public static ManifestOutputFileFactory createOutputFileFactory(
* @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same
* partition spec
*/
public static DeltaManifests writeCompletedFiles(
static DeltaManifests writeCompletedFiles(
WriteResult result, Supplier<OutputFile> outputFileSupplier, PartitionSpec spec)
throws IOException {

Expand Down Expand Up @@ -115,7 +113,7 @@ public static DeltaManifests writeCompletedFiles(
return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles());
}

public static WriteResult readCompletedFiles(
static WriteResult readCompletedFiles(
DeltaManifests deltaManifests, FileIO io, Map<Integer, PartitionSpec> specsById)
throws IOException {
WriteResult.Builder builder = WriteResult.builder();
Expand All @@ -136,7 +134,7 @@ public static WriteResult readCompletedFiles(
return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build();
}

public static void deleteCommittedManifests(
static void deleteCommittedManifests(
Table table, List<ManifestFile> manifests, String newFlinkJobId, long checkpointId) {
for (ManifestFile manifest : manifests) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.flink.annotation.Internal;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.iceberg.flink.util.ElapsedTimeGauge;

@Internal
public class IcebergFilesCommitterMetrics {
class IcebergFilesCommitterMetrics {
private final AtomicLong lastCheckpointDurationMs = new AtomicLong();
private final AtomicLong lastCommitDurationMs = new AtomicLong();
private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit;
Expand All @@ -37,7 +35,7 @@ public class IcebergFilesCommitterMetrics {
private final Counter committedDeleteFilesRecordCount;
private final Counter committedDeleteFilesByteCount;

public IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) {
IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) {
MetricGroup committerMetrics =
metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName);
committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get);
Expand All @@ -58,12 +56,12 @@ void checkpointDuration(long checkpointDurationMs) {
lastCheckpointDurationMs.set(checkpointDurationMs);
}

public void commitDuration(long commitDurationMs) {
void commitDuration(long commitDurationMs) {
lastCommitDurationMs.set(commitDurationMs);
}

/** This is called upon a successful commit. */
public void updateCommitSummary(CommitSummary stats) {
void updateCommitSummary(CommitSummary stats) {
elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime();
committedDataFilesCount.inc(stats.dataFilesCount());
committedDataFilesRecordCount.inc(stats.dataFilesRecordCount());
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
jmhOutputPath=build/reports/jmh/human-readable-output.txt
jmhJsonOutputPath=build/reports/jmh/results.json
jmhIncludeRegex=.*
systemProp.defaultFlinkVersions=1.20
systemProp.defaultFlinkVersions=1.19
systemProp.knownFlinkVersions=1.18,1.19,1.20
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
Expand Down

0 comments on commit 00848b7

Please sign in to comment.