diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java index b98bfcd7811a..e8a46c5becd7 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/BaseDeltaTaskWriter.java @@ -109,7 +109,7 @@ public void write(RowData row) throws IOException { } protected class RowDataDeltaWriter extends BaseEqualityDeltaWriter { - public RowDataDeltaWriter(PartitionKey partition) { + RowDataDeltaWriter(PartitionKey partition) { super(partition, schema, deleteSchema, DeleteGranularity.FILE); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java index 8773d2d46d5c..9a2f57181708 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/CommitSummary.java @@ -21,12 +21,10 @@ import java.util.Arrays; import java.util.NavigableMap; import java.util.concurrent.atomic.AtomicLong; -import org.apache.flink.annotation.Internal; import org.apache.iceberg.io.WriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; -@Internal -public class CommitSummary { +class CommitSummary { private final AtomicLong dataFilesCount = new AtomicLong(); private final AtomicLong dataFilesRecordCount = new AtomicLong(); @@ -35,7 +33,7 @@ public class CommitSummary { private final AtomicLong deleteFilesRecordCount = new AtomicLong(); private final AtomicLong deleteFilesByteCount = new AtomicLong(); - public CommitSummary(NavigableMap pendingResults) { + CommitSummary(NavigableMap pendingResults) { pendingResults .values() .forEach( @@ -57,27 +55,27 @@ public CommitSummary(NavigableMap pendingResults) { }); } - public long dataFilesCount() { + long dataFilesCount() { return dataFilesCount.get(); } - public long dataFilesRecordCount() { + long dataFilesRecordCount() { return dataFilesRecordCount.get(); } - public long dataFilesByteCount() { + long dataFilesByteCount() { return dataFilesByteCount.get(); } - public long deleteFilesCount() { + long deleteFilesCount() { return deleteFilesCount.get(); } - public long deleteFilesRecordCount() { + long deleteFilesRecordCount() { return deleteFilesRecordCount.get(); } - public long deleteFilesByteCount() { + long deleteFilesByteCount() { return deleteFilesByteCount.get(); } diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java index a3d13cc71114..036970c06d5b 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifests.java @@ -19,13 +19,11 @@ package org.apache.iceberg.flink.sink; import java.util.List; -import org.apache.flink.annotation.Internal; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -@Internal -public class DeltaManifests { +class DeltaManifests { private static final CharSequence[] EMPTY_REF_DATA_FILES = new CharSequence[0]; @@ -33,11 +31,11 @@ public class DeltaManifests { private final ManifestFile deleteManifest; private final CharSequence[] referencedDataFiles; - public DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { + DeltaManifests(ManifestFile dataManifest, ManifestFile deleteManifest) { this(dataManifest, deleteManifest, EMPTY_REF_DATA_FILES); } - public DeltaManifests( + DeltaManifests( ManifestFile dataManifest, ManifestFile deleteManifest, CharSequence[] referencedDataFiles) { Preconditions.checkNotNull(referencedDataFiles, "Referenced data files shouldn't be null."); @@ -46,19 +44,19 @@ public DeltaManifests( this.referencedDataFiles = referencedDataFiles; } - public ManifestFile dataManifest() { + ManifestFile dataManifest() { return dataManifest; } - public ManifestFile deleteManifest() { + ManifestFile deleteManifest() { return deleteManifest; } - public CharSequence[] referencedDataFiles() { + CharSequence[] referencedDataFiles() { return referencedDataFiles; } - public List manifests() { + List manifests() { List manifests = Lists.newArrayListWithCapacity(2); if (dataManifest != null) { manifests.add(dataManifest); diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java index 6ad41bacf337..7aa18929d925 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/DeltaManifestsSerializer.java @@ -23,14 +23,12 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; -import org.apache.flink.annotation.Internal; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.iceberg.ManifestFile; import org.apache.iceberg.ManifestFiles; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -@Internal -public class DeltaManifestsSerializer implements SimpleVersionedSerializer { +class DeltaManifestsSerializer implements SimpleVersionedSerializer { private static final int VERSION_1 = 1; private static final int VERSION_2 = 2; private static final byte[] EMPTY_BINARY = new byte[0]; diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java index a8424db83c3d..188e16aa5e31 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkManifestUtil.java @@ -22,7 +22,6 @@ import java.util.List; import java.util.Map; import java.util.function.Supplier; -import org.apache.flink.annotation.Internal; import org.apache.iceberg.DataFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.ManifestFile; @@ -39,15 +38,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -@Internal -public class FlinkManifestUtil { +class FlinkManifestUtil { private static final int FORMAT_V2 = 2; private static final Long DUMMY_SNAPSHOT_ID = 0L; private static final Logger LOG = LoggerFactory.getLogger(FlinkManifestUtil.class); private FlinkManifestUtil() {} - public static ManifestFile writeDataFiles( + static ManifestFile writeDataFiles( OutputFile outputFile, PartitionSpec spec, List dataFiles) throws IOException { ManifestWriter writer = ManifestFiles.write(FORMAT_V2, spec, outputFile, DUMMY_SNAPSHOT_ID); @@ -59,7 +57,7 @@ public static ManifestFile writeDataFiles( return writer.toManifestFile(); } - public static List readDataFiles( + static List readDataFiles( ManifestFile manifestFile, FileIO io, Map specsById) throws IOException { try (CloseableIterable dataFiles = ManifestFiles.read(manifestFile, io, specsById)) { @@ -67,7 +65,7 @@ public static List readDataFiles( } } - public static ManifestOutputFileFactory createOutputFileFactory( + static ManifestOutputFileFactory createOutputFileFactory( Supplier tableSupplier, Map tableProps, String flinkJobId, @@ -84,7 +82,7 @@ public static ManifestOutputFileFactory createOutputFileFactory( * @param result all those DataFiles/DeleteFiles in this WriteResult should be written with same * partition spec */ - public static DeltaManifests writeCompletedFiles( + static DeltaManifests writeCompletedFiles( WriteResult result, Supplier outputFileSupplier, PartitionSpec spec) throws IOException { @@ -115,7 +113,7 @@ public static DeltaManifests writeCompletedFiles( return new DeltaManifests(dataManifest, deleteManifest, result.referencedDataFiles()); } - public static WriteResult readCompletedFiles( + static WriteResult readCompletedFiles( DeltaManifests deltaManifests, FileIO io, Map specsById) throws IOException { WriteResult.Builder builder = WriteResult.builder(); @@ -136,7 +134,7 @@ public static WriteResult readCompletedFiles( return builder.addReferencedDataFiles(deltaManifests.referencedDataFiles()).build(); } - public static void deleteCommittedManifests( + static void deleteCommittedManifests( Table table, List manifests, String newFlinkJobId, long checkpointId) { for (ManifestFile manifest : manifests) { try { diff --git a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java index c76608a06e2f..5b28c4acb1c5 100644 --- a/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java +++ b/flink/v1.19/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitterMetrics.java @@ -20,13 +20,11 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; -import org.apache.flink.annotation.Internal; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.MetricGroup; import org.apache.iceberg.flink.util.ElapsedTimeGauge; -@Internal -public class IcebergFilesCommitterMetrics { +class IcebergFilesCommitterMetrics { private final AtomicLong lastCheckpointDurationMs = new AtomicLong(); private final AtomicLong lastCommitDurationMs = new AtomicLong(); private final ElapsedTimeGauge elapsedSecondsSinceLastSuccessfulCommit; @@ -37,7 +35,7 @@ public class IcebergFilesCommitterMetrics { private final Counter committedDeleteFilesRecordCount; private final Counter committedDeleteFilesByteCount; - public IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) { + IcebergFilesCommitterMetrics(MetricGroup metrics, String fullTableName) { MetricGroup committerMetrics = metrics.addGroup("IcebergFilesCommitter").addGroup("table", fullTableName); committerMetrics.gauge("lastCheckpointDurationMs", lastCheckpointDurationMs::get); @@ -58,12 +56,12 @@ void checkpointDuration(long checkpointDurationMs) { lastCheckpointDurationMs.set(checkpointDurationMs); } - public void commitDuration(long commitDurationMs) { + void commitDuration(long commitDurationMs) { lastCommitDurationMs.set(commitDurationMs); } /** This is called upon a successful commit. */ - public void updateCommitSummary(CommitSummary stats) { + void updateCommitSummary(CommitSummary stats) { elapsedSecondsSinceLastSuccessfulCommit.refreshLastRecordedTime(); committedDataFilesCount.inc(stats.dataFilesCount()); committedDataFilesRecordCount.inc(stats.dataFilesRecordCount()); diff --git a/gradle.properties b/gradle.properties index fcbe7d8de012..c953db9e34d9 100644 --- a/gradle.properties +++ b/gradle.properties @@ -16,7 +16,7 @@ jmhOutputPath=build/reports/jmh/human-readable-output.txt jmhJsonOutputPath=build/reports/jmh/results.json jmhIncludeRegex=.* -systemProp.defaultFlinkVersions=1.20 +systemProp.defaultFlinkVersions=1.19 systemProp.knownFlinkVersions=1.18,1.19,1.20 systemProp.defaultHiveVersions=2 systemProp.knownHiveVersions=2,3