diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatistics.java b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java new file mode 100644 index 000000000000..6d3f3d28c7bc --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/PartitionStatistics.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +public interface PartitionStatistics extends StructLike { + + /** Returns the partition of these partition statistics */ + StructLike partition(); + + /** Returns the spec ID of the partition of these partition statistics */ + int specId(); + + /** Returns the number of data records in the partition */ + long dataRecordCount(); + + /** Returns the number of data files in the partition */ + int dataFileCount(); + + /** Returns the total size of data files in bytes in the partition */ + long totalDataFileSizeInBytes(); + + /** Returns the number of positional delete records in the partition */ + long positionDeleteRecordCount(); + + /** Returns the number of positional delete files in the partition */ + int positionDeleteFileCount(); + + /** Returns the number of equality delete records in the partition */ + long equalityDeleteRecordCount(); + + /** Returns the number of equality delete files in the partition */ + int equalityDeleteFileCount(); + + /** Returns the total number of records in the partition */ + Long totalRecords(); + + /** Returns the timestamp in milliseconds when the partition was last updated */ + Long lastUpdatedAt(); + + /** Returns the ID of the snapshot that last updated this partition */ + Long lastUpdatedSnapshotId(); + + /** Returns the number of delete vectors in the partition */ + int dvCount(); +} diff --git a/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java new file mode 100644 index 000000000000..16ddf9732334 --- /dev/null +++ b/api/src/main/java/org/apache/iceberg/PartitionStatisticsScan.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; + +public interface PartitionStatisticsScan { + + /** + * Create a new scan from this scan's configuration that will use the given snapshot by ID. + * + * @param snapshotId a snapshot ID + * @return a new scan based on this with the given snapshot ID + * @throws IllegalArgumentException if the snapshot cannot be found + */ + PartitionStatisticsScan useSnapshot(long snapshotId); + + /** + * Create a new scan from the results of this, where partitions are filtered by the {@link + * Expression}. + * + * @param filter a filter expression + * @return a new scan based on this with results filtered by the expression + */ + PartitionStatisticsScan filter(Expression filter); + + /** + * Create a new scan from this with the schema as its projection. + * + * @param schema a projection schema + * @return a new scan based on this with the given projection + */ + PartitionStatisticsScan project(Schema schema); + + /** + * Scans a partition statistics file belonging to a particular snapshot + * + * @return an Iterable of partition statistics + */ + CloseableIterable scan(); +} diff --git a/api/src/main/java/org/apache/iceberg/Table.java b/api/src/main/java/org/apache/iceberg/Table.java index 97ea9ba76526..3c0689e89288 100644 --- a/api/src/main/java/org/apache/iceberg/Table.java +++ b/api/src/main/java/org/apache/iceberg/Table.java @@ -83,6 +83,18 @@ default IncrementalChangelogScan newIncrementalChangelogScan() { throw new UnsupportedOperationException("Incremental changelog scan is not supported"); } + /** + * Create a new {@link PartitionStatisticsScan} for this table. + * + *

Once a partition statistics scan is created, it can be refined to project columns and filter + * data. + * + * @return a partition statistics scan for this table + */ + default PartitionStatisticsScan newPartitionStatisticsScan() { + throw new UnsupportedOperationException("Partition statistics scan is not supported"); + } + /** * Return the {@link Schema schema} for this table. * diff --git a/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java new file mode 100644 index 000000000000..041e0ecd6089 --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/BasePartitionStatisticsScan.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import java.util.List; +import java.util.Optional; +import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Types; + +public class BasePartitionStatisticsScan implements PartitionStatisticsScan { + + private final Table table; + private Long snapshotId; + + public BasePartitionStatisticsScan(Table table) { + this.table = table; + } + + @Override + public PartitionStatisticsScan useSnapshot(long newSnapshotId) { + Preconditions.checkArgument( + table.snapshot(newSnapshotId) != null, "Cannot find snapshot with ID %s", newSnapshotId); + + this.snapshotId = newSnapshotId; + return this; + } + + @Override + public PartitionStatisticsScan filter(Expression newFilter) { + throw new UnsupportedOperationException("Filtering is not supported"); + } + + @Override + public PartitionStatisticsScan project(Schema newSchema) { + throw new UnsupportedOperationException("Projection is not supported"); + } + + @Override + public CloseableIterable scan() { + if (snapshotId == null) { + if (table.currentSnapshot() == null) { + return CloseableIterable.of(List.of()); + } + + snapshotId = table.currentSnapshot().snapshotId(); + } + + Optional statsFile = + table.partitionStatisticsFiles().stream() + .filter(f -> f.snapshotId() == snapshotId) + .findFirst(); + + if (statsFile.isEmpty()) { + return CloseableIterable.of(List.of()); + } + + Types.StructType partitionType = Partitioning.partitionType(table); + Schema schema = PartitionStatsHandler.schema(partitionType, TableUtil.formatVersion(table)); + + FileFormat fileFormat = FileFormat.fromFileName(statsFile.get().path()); + Preconditions.checkNotNull( + fileFormat != null, "Unable to determine format of file: %s", statsFile.get().path()); + + CloseableIterable records = + InternalData.read(fileFormat, table.io().newInputFile(statsFile.get().path())) + .project(schema) + .build(); + + return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + } +} diff --git a/core/src/main/java/org/apache/iceberg/BaseTable.java b/core/src/main/java/org/apache/iceberg/BaseTable.java index 23299a962ce5..c489c3bfb517 100644 --- a/core/src/main/java/org/apache/iceberg/BaseTable.java +++ b/core/src/main/java/org/apache/iceberg/BaseTable.java @@ -90,6 +90,11 @@ public IncrementalChangelogScan newIncrementalChangelogScan() { return new BaseIncrementalChangelogScan(this); } + @Override + public PartitionStatisticsScan newPartitionStatisticsScan() { + return new BasePartitionStatisticsScan(this); + } + @Override public Schema schema() { return ops.current().schema(); diff --git a/core/src/main/java/org/apache/iceberg/PartitionStats.java b/core/src/main/java/org/apache/iceberg/PartitionStats.java index 9051c8535c7e..89013ce5be85 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStats.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStats.java @@ -20,7 +20,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -public class PartitionStats implements StructLike { +public class PartitionStats implements PartitionStatistics { private static final int STATS_COUNT = 13; @@ -43,54 +43,67 @@ public PartitionStats(StructLike partition, int specId) { this.specId = specId; } + @Override public StructLike partition() { return partition; } + @Override public int specId() { return specId; } + @Override public long dataRecordCount() { return dataRecordCount; } + @Override public int dataFileCount() { return dataFileCount; } + @Override public long totalDataFileSizeInBytes() { return totalDataFileSizeInBytes; } + @Override public long positionDeleteRecordCount() { return positionDeleteRecordCount; } + @Override public int positionDeleteFileCount() { return positionDeleteFileCount; } + @Override public long equalityDeleteRecordCount() { return equalityDeleteRecordCount; } + @Override public int equalityDeleteFileCount() { return equalityDeleteFileCount; } + @Override public Long totalRecords() { return totalRecordCount; } + @Override public Long lastUpdatedAt() { return lastUpdatedAt; } + @Override public Long lastUpdatedSnapshotId() { return lastUpdatedSnapshotId; } + @Override public int dvCount() { return dvCount; } @@ -187,31 +200,40 @@ void deletedEntryForIncrementalCompute(ContentFile file, Snapshot snapshot) { * * @param entry the entry from which statistics will be sourced. */ - void appendStats(PartitionStats entry) { + void appendStats(PartitionStatistics entry) { Preconditions.checkArgument(specId == entry.specId(), "Spec IDs must match"); - this.dataRecordCount += entry.dataRecordCount; - this.dataFileCount += entry.dataFileCount; - this.totalDataFileSizeInBytes += entry.totalDataFileSizeInBytes; - this.positionDeleteRecordCount += entry.positionDeleteRecordCount; - this.positionDeleteFileCount += entry.positionDeleteFileCount; - this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount; - this.equalityDeleteFileCount += entry.equalityDeleteFileCount; - this.dvCount += entry.dvCount; + this.dataRecordCount += entry.dataRecordCount(); + this.dataFileCount += entry.dataFileCount(); + this.totalDataFileSizeInBytes += entry.totalDataFileSizeInBytes(); + this.positionDeleteRecordCount += entry.positionDeleteRecordCount(); + this.positionDeleteFileCount += entry.positionDeleteFileCount(); + this.equalityDeleteRecordCount += entry.equalityDeleteRecordCount(); + this.equalityDeleteFileCount += entry.equalityDeleteFileCount(); + this.dvCount += entry.dvCount(); - if (entry.totalRecordCount != null) { + if (entry.totalRecords() != null) { if (totalRecordCount == null) { - this.totalRecordCount = entry.totalRecordCount; + this.totalRecordCount = entry.totalRecords(); } else { - this.totalRecordCount += entry.totalRecordCount; + this.totalRecordCount += entry.totalRecords(); } } - if (entry.lastUpdatedAt != null) { - updateSnapshotInfo(entry.lastUpdatedSnapshotId, entry.lastUpdatedAt); + if (entry.lastUpdatedAt() != null) { + updateSnapshotInfo(entry.lastUpdatedSnapshotId(), entry.lastUpdatedAt()); } } + /** + * @deprecated will be removed in 1.12.0. Use {@link + * PartitionStats#appendStats(PartitionStatistics) instead} + */ + @Deprecated + void appendStats(PartitionStats entry) { + appendStats((PartitionStatistics) entry); + } + private void updateSnapshotInfo(long snapshotId, long updatedAt) { if (lastUpdatedAt == null || lastUpdatedAt < updatedAt) { this.lastUpdatedAt = updatedAt; diff --git a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java index 4e7c1b104ee8..c97e9030fb1d 100644 --- a/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java +++ b/core/src/main/java/org/apache/iceberg/PartitionStatsHandler.java @@ -208,9 +208,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long Snapshot snapshot = table.snapshot(snapshotId); Preconditions.checkArgument(snapshot != null, "Snapshot not found: %s", snapshotId); - StructType partitionType = Partitioning.partitionType(table); - - Collection stats; + Collection stats; PartitionStatisticsFile statisticsFile = latestStatsFile(table, snapshot.snapshotId()); if (statisticsFile == null) { LOG.info( @@ -225,7 +223,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long } try { - stats = computeAndMergeStatsIncremental(table, snapshot, partitionType, statisticsFile); + stats = computeAndMergeStatsIncremental(table, snapshot, statisticsFile.snapshotId()); } catch (InvalidStatsFileException exception) { LOG.warn( "Using full compute as previous statistics file is corrupted for incremental compute."); @@ -240,7 +238,9 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long return null; } - List sortedStats = sortStatsByPartition(stats, partitionType); + StructType partitionType = Partitioning.partitionType(table); + + List sortedStats = sortStatsByPartition(stats, partitionType); return writePartitionStatsFile( table, snapshot.snapshotId(), @@ -250,7 +250,7 @@ public static PartitionStatisticsFile computeAndWriteStatsFile(Table table, long @VisibleForTesting static PartitionStatisticsFile writePartitionStatsFile( - Table table, long snapshotId, Schema dataSchema, Iterable records) + Table table, long snapshotId, Schema dataSchema, Iterable records) throws IOException { FileFormat fileFormat = FileFormat.fromString( @@ -275,7 +275,9 @@ static PartitionStatisticsFile writePartitionStatsFile( * * @param schema The {@link Schema} of the partition statistics file. * @param inputFile An {@link InputFile} pointing to the partition stats file. + * @deprecated will be removed in 1.12.0, use {@link PartitionStatisticsScan} instead */ + @Deprecated public static CloseableIterable readPartitionStatsFile( Schema schema, InputFile inputFile) { Preconditions.checkArgument(schema != null, "Invalid schema: null"); @@ -287,7 +289,11 @@ public static CloseableIterable readPartitionStatsFile( CloseableIterable records = InternalData.read(fileFormat, inputFile).project(schema).build(); - return CloseableIterable.transform(records, PartitionStatsHandler::recordToPartitionStats); + return CloseableIterable.transform( + records, + rec -> { + return (PartitionStats) recordToPartitionStats(rec); + }); } private static OutputFile newPartitionStatsFile( @@ -307,7 +313,7 @@ private static OutputFile newPartitionStatsFile( Locale.ROOT, "partition-stats-%d-%s", snapshotId, UUID.randomUUID())))); } - private static PartitionStats recordToPartitionStats(StructLike record) { + static PartitionStatistics recordToPartitionStats(StructLike record) { int pos = 0; PartitionStats stats = new PartitionStats( @@ -320,17 +326,12 @@ private static PartitionStats recordToPartitionStats(StructLike record) { return stats; } - private static Collection computeAndMergeStatsIncremental( - Table table, - Snapshot snapshot, - StructType partitionType, - PartitionStatisticsFile previousStatsFile) { - PartitionMap statsMap = PartitionMap.create(table.specs()); + private static Collection computeAndMergeStatsIncremental( + Table table, Snapshot snapshot, long lastSnapshotWithStats) { + PartitionMap statsMap = PartitionMap.create(table.specs()); // read previous stats, note that partition field will be read as GenericRecord - try (CloseableIterable oldStats = - readPartitionStatsFile( - schema(partitionType, TableUtil.formatVersion(table)), - table.io().newInputFile(previousStatsFile.path()))) { + try (CloseableIterable oldStats = + table.newPartitionStatisticsScan().useSnapshot(lastSnapshotWithStats).scan()) { oldStats.forEach( partitionStats -> statsMap.put(partitionStats.specId(), partitionStats.partition(), partitionStats)); @@ -339,8 +340,8 @@ private static Collection computeAndMergeStatsIncremental( } // incrementally compute the new stats, partition field will be written as PartitionData - PartitionMap incrementalStatsMap = - computeStatsDiff(table, table.snapshot(previousStatsFile.snapshotId()), snapshot); + PartitionMap incrementalStatsMap = + computeStatsDiff(table, table.snapshot(lastSnapshotWithStats), snapshot); // convert PartitionData into GenericRecord and merge stats incrementalStatsMap.forEach( @@ -349,7 +350,7 @@ private static Collection computeAndMergeStatsIncremental( Pair.of(key.first(), partitionDataToRecord((PartitionData) key.second())), value, (existingEntry, newEntry) -> { - existingEntry.appendStats(newEntry); + ((PartitionStats) existingEntry).appendStats(newEntry); return existingEntry; })); @@ -387,7 +388,7 @@ static PartitionStatisticsFile latestStatsFile(Table table, long snapshotId) { return null; } - private static PartitionMap computeStatsDiff( + private static PartitionMap computeStatsDiff( Table table, Snapshot fromSnapshot, Snapshot toSnapshot) { Iterable snapshots = SnapshotUtil.ancestorsBetween( @@ -406,10 +407,10 @@ private static PartitionMap computeStatsDiff( return computeStats(table, manifests, true /* incremental */); } - private static PartitionMap computeStats( + private static PartitionMap computeStats( Table table, List manifests, boolean incremental) { StructType partitionType = Partitioning.partitionType(table); - Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); + Queue> statsByManifest = Queues.newConcurrentLinkedQueue(); Tasks.foreach(manifests) .stopOnFailure() .throwFailureWhenFinished() @@ -419,19 +420,19 @@ private static PartitionMap computeStats( statsByManifest.add( collectStatsForManifest(table, manifest, partitionType, incremental))); - PartitionMap statsMap = PartitionMap.create(table.specs()); - for (PartitionMap stats : statsByManifest) { + PartitionMap statsMap = PartitionMap.create(table.specs()); + for (PartitionMap stats : statsByManifest) { mergePartitionMap(stats, statsMap); } return statsMap; } - private static PartitionMap collectStatsForManifest( + private static PartitionMap collectStatsForManifest( Table table, ManifestFile manifest, StructType partitionType, boolean incremental) { List projection = BaseScan.scanColumns(manifest.content()); try (ManifestReader reader = ManifestFiles.open(manifest, table.io()).select(projection)) { - PartitionMap statsMap = PartitionMap.create(table.specs()); + PartitionMap statsMap = PartitionMap.create(table.specs()); int specId = manifest.partitionSpecId(); PartitionSpec spec = table.specs().get(specId); PartitionData keyTemplate = new PartitionData(partitionType); @@ -443,10 +444,11 @@ private static PartitionMap collectStatsForManifest( StructLike key = keyTemplate.copyFor(coercedPartition); Snapshot snapshot = table.snapshot(entry.snapshotId()); PartitionStats stats = - statsMap.computeIfAbsent( - specId, - ((PartitionData) file.partition()).copy(), - () -> new PartitionStats(key, specId)); + (PartitionStats) + statsMap.computeIfAbsent( + specId, + ((PartitionData) file.partition()).copy(), + () -> new PartitionStats(key, specId)); if (entry.isLive()) { // Live can have both added and existing entries. Consider only added entries for // incremental compute as existing entries was already included in previous compute. @@ -469,23 +471,23 @@ private static PartitionMap collectStatsForManifest( } private static void mergePartitionMap( - PartitionMap fromMap, PartitionMap toMap) { + PartitionMap fromMap, PartitionMap toMap) { fromMap.forEach( (key, value) -> toMap.merge( key, value, (existingEntry, newEntry) -> { - existingEntry.appendStats(newEntry); + ((PartitionStats) existingEntry).appendStats(newEntry); return existingEntry; })); } - private static List sortStatsByPartition( - Collection stats, StructType partitionType) { - List entries = Lists.newArrayList(stats); + private static List sortStatsByPartition( + Collection stats, StructType partitionType) { + List entries = Lists.newArrayList(stats); entries.sort( - Comparator.comparing(PartitionStats::partition, Comparators.forType(partitionType))); + Comparator.comparing(PartitionStatistics::partition, Comparators.forType(partitionType))); return entries; } diff --git a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java index 71fdc9507d58..11c878cf31da 100644 --- a/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java +++ b/core/src/test/java/org/apache/iceberg/PartitionStatsHandlerTestBase.java @@ -227,14 +227,29 @@ public void testAllDatatypePartitionWriting() throws Exception { partitionStats.set(DATA_RECORD_COUNT_POSITION, RANDOM.nextLong()); partitionStats.set(DATA_FILE_COUNT_POSITION, RANDOM.nextInt()); partitionStats.set(TOTAL_DATA_FILE_SIZE_IN_BYTES_POSITION, 1024L * RANDOM.nextInt(20)); - List expected = Collections.singletonList(partitionStats); - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); + List expected = Collections.singletonList(partitionStats); - List written; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, testTable.io().newInputFile(statisticsFile.path()))) { + DataFile dataFile = + DataFiles.builder(spec) + .withPath("some_path") + .withPartition(partitionData) + .withFileSizeInBytes(15) + .withFormat(FileFormat.PARQUET) + .withRecordCount(1) + .build(); + testTable.newAppend().appendFile(dataFile).commit(); + long snapshotId = testTable.currentSnapshot().snapshotId(); + + testTable + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.writePartitionStatsFile( + testTable, snapshotId, dataSchema, expected)) + .commit(); + + List written; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()) { written = Lists.newArrayList(recordIterator); } @@ -260,7 +275,7 @@ public void testOptionalFieldsWriting() throws Exception { Types.StructType partitionSchema = Partitioning.partitionType(testTable); Schema dataSchema = PartitionStatsHandler.schema(partitionSchema, formatVersion); - ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); + ImmutableList.Builder partitionListBuilder = ImmutableList.builder(); for (int i = 0; i < 5; i++) { PartitionData partitionData = new PartitionData(dataSchema.findField(PARTITION_FIELD_ID).type().asStructType()); @@ -282,29 +297,42 @@ public void testOptionalFieldsWriting() throws Exception { partitionListBuilder.add(stats); } - List expected = partitionListBuilder.build(); + List expected = partitionListBuilder.build(); assertThat(expected.get(0)) .extracting( - PartitionStats::positionDeleteRecordCount, - PartitionStats::positionDeleteFileCount, - PartitionStats::equalityDeleteRecordCount, - PartitionStats::equalityDeleteFileCount, - PartitionStats::totalRecords, - PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId, - PartitionStats::dvCount) + PartitionStatistics::positionDeleteRecordCount, + PartitionStatistics::positionDeleteFileCount, + PartitionStatistics::equalityDeleteRecordCount, + PartitionStatistics::equalityDeleteFileCount, + PartitionStatistics::totalRecords, + PartitionStatistics::lastUpdatedAt, + PartitionStatistics::lastUpdatedSnapshotId, + PartitionStatistics::dvCount) .isEqualTo( Arrays.asList( 0L, 0, 0L, 0, null, null, null, 0)); // null counters must be initialized to zero. - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.writePartitionStatsFile(testTable, 42L, dataSchema, expected); - - List written; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, testTable.io().newInputFile(statisticsFile.path()))) { + DataFile dataFile = + DataFiles.builder(spec) + .withPath("some_path") + .withFileSizeInBytes(15) + .withFormat(FileFormat.PARQUET) + .withRecordCount(1) + .build(); + testTable.newAppend().appendFile(dataFile).commit(); + long snapshotId = testTable.currentSnapshot().snapshotId(); + + testTable + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.writePartitionStatsFile( + testTable, snapshotId, dataSchema, expected)) + .commit(); + + List written; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()) { written = Lists.newArrayList(recordIterator); } @@ -512,27 +540,24 @@ public void testCopyOnWriteDelete() throws Exception { testTable.newAppend().appendFile(dataFile1).appendFile(dataFile2).commit(); - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.computeAndWriteStatsFile(testTable); - testTable.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); + testTable + .updatePartitionStatistics() + .setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(testTable)) + .commit(); - assertThat( - PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2), - testTable.io().newInputFile(statisticsFile.path()))) + assertThat(testTable.newPartitionStatisticsScan().scan()) .allMatch(s -> (s.dataRecordCount() != 0 && s.dataFileCount() != 0)); testTable.newDelete().deleteFile(dataFile1).commit(); testTable.newDelete().deleteFile(dataFile2).commit(); - PartitionStatisticsFile statisticsFileNew = - PartitionStatsHandler.computeAndWriteStatsFile(testTable); + testTable + .updatePartitionStatistics() + .setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(testTable)) + .commit(); // stats must be decremented to zero as all the files removed from table. - assertThat( - PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(testTable), 2), - testTable.io().newInputFile(statisticsFileNew.path()))) + assertThat(testTable.newPartitionStatisticsScan().scan()) .allMatch(s -> (s.dataRecordCount() == 0 && s.dataFileCount() == 0)); } @@ -617,16 +642,30 @@ public void testReadingStatsWithInvalidSchema() throws Exception { Table testTable = TestTables.create(tempDir("old_schema"), "old_schema", SCHEMA, spec, 2, fileFormatProperty); Types.StructType partitionType = Partitioning.partitionType(testTable); - Schema newSchema = PartitionStatsHandler.schema(partitionType, 2); Schema oldSchema = invalidOldSchema(partitionType); - PartitionStatisticsFile invalidStatisticsFile = - PartitionStatsHandler.writePartitionStatsFile( - testTable, 42L, oldSchema, Collections.singletonList(randomStats(partitionType))); + DataFile dataFile = + DataFiles.builder(spec) + .withPath("some_path") + .withFileSizeInBytes(15) + .withFormat(FileFormat.PARQUET) + .withRecordCount(1) + .build(); + testTable.newAppend().appendFile(dataFile).commit(); + long snapshotId = testTable.currentSnapshot().snapshotId(); + + testTable + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.writePartitionStatsFile( + testTable, + snapshotId, + oldSchema, + Collections.singletonList(randomStats(partitionType)))) + .commit(); - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - newSchema, testTable.io().newInputFile(invalidStatisticsFile.path()))) { + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()) { if (format() == FileFormat.PARQUET) { assertThatThrownBy(() -> Lists.newArrayList(recordIterator)) @@ -660,15 +699,15 @@ public void testFullComputeFallbackWithInvalidStats() throws Exception { testTable.updatePartitionStatistics().setPartitionStatistics(invalidStatisticsFile).commit(); testTable.newAppend().appendFile(dataFile).commit(); - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.computeAndWriteStatsFile(testTable); + testTable + .updatePartitionStatistics() + .setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(testTable)) + .commit(); // read the partition entries from the stats file - List partitionStats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(partitionType, 2), - testTable.io().newInputFile(statisticsFile.path()))) { + List partitionStats; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().scan()) { partitionStats = Lists.newArrayList(recordIterator); } @@ -687,27 +726,29 @@ public void testV2toV3SchemaEvolution() throws Exception { DataFile dataFile = FileGenerationUtil.generateDataFile(testTable, TestHelpers.Row.of("foo", "A")); testTable.newAppend().appendFile(dataFile).commit(); - PartitionStatisticsFile statisticsFile = - PartitionStatsHandler.computeAndWriteStatsFile( - testTable, testTable.currentSnapshot().snapshotId()); + + testTable + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.computeAndWriteStatsFile( + testTable, testTable.currentSnapshot().snapshotId())) + .commit(); Types.StructType partitionSchema = Partitioning.partitionType(testTable); // read with v2 schema - Schema v2Schema = PartitionStatsHandler.schema(partitionSchema, 2); - List partitionStatsV2; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - v2Schema, testTable.io().newInputFile(statisticsFile.path()))) { + List partitionStatsV2; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().scan()) { partitionStatsV2 = Lists.newArrayList(recordIterator); } + testTable.updateProperties().set(TableProperties.FORMAT_VERSION, "3").commit(); + // read with v3 schema - Schema v3Schema = PartitionStatsHandler.schema(partitionSchema, 3); - List partitionStatsV3; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - v3Schema, testTable.io().newInputFile(statisticsFile.path()))) { + List partitionStatsV3; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().scan()) { partitionStatsV3 = Lists.newArrayList(recordIterator); } @@ -735,28 +776,27 @@ private static void computeAndValidatePartitionStats( assertThat(result.snapshotId()).isEqualTo(currentSnapshot.snapshotId()); // read the partition entries from the stats file - List partitionStats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - recordSchema, testTable.io().newInputFile(result.path()))) { + List partitionStats; + try (CloseableIterable recordIterator = + testTable.newPartitionStatisticsScan().scan()) { partitionStats = Lists.newArrayList(recordIterator); } assertThat(partitionStats) .extracting( - PartitionStats::partition, - PartitionStats::specId, - PartitionStats::dataRecordCount, - PartitionStats::dataFileCount, - PartitionStats::totalDataFileSizeInBytes, - PartitionStats::positionDeleteRecordCount, - PartitionStats::positionDeleteFileCount, - PartitionStats::equalityDeleteRecordCount, - PartitionStats::equalityDeleteFileCount, - PartitionStats::totalRecords, - PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId, - PartitionStats::dvCount) + PartitionStatistics::partition, + PartitionStatistics::specId, + PartitionStatistics::dataRecordCount, + PartitionStatistics::dataFileCount, + PartitionStatistics::totalDataFileSizeInBytes, + PartitionStatistics::positionDeleteRecordCount, + PartitionStatistics::positionDeleteFileCount, + PartitionStatistics::equalityDeleteRecordCount, + PartitionStatistics::equalityDeleteFileCount, + PartitionStatistics::totalRecords, + PartitionStatistics::lastUpdatedAt, + PartitionStatistics::lastUpdatedSnapshotId, + PartitionStatistics::dvCount) .containsExactlyInAnyOrder(expectedValues); } @@ -794,7 +834,9 @@ private PartitionStats randomStats(Types.StructType partitionType) { @SuppressWarnings("checkstyle:CyclomaticComplexity") private static boolean isEqual( - Comparator partitionComparator, PartitionStats stats1, PartitionStats stats2) { + Comparator partitionComparator, + PartitionStatistics stats1, + PartitionStatistics stats2) { if (stats1 == stats2) { return true; } else if (stats1 == null || stats2 == null) { diff --git a/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java b/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java index 938dc2863784..1995e8d02d28 100644 --- a/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java +++ b/data/src/jmh/java/org/apache/iceberg/PartitionStatsHandlerBenchmark.java @@ -99,13 +99,14 @@ public void tearDownBenchmark() { @Benchmark @Threads(1) public void benchmarkPartitionStats() throws IOException { - PartitionStatisticsFile statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table); - - List stats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - PartitionStatsHandler.schema(Partitioning.partitionType(table), 2), - Files.localInput(statisticsFile.path()))) { + table + .updatePartitionStatistics() + .setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(table)) + .commit(); + + List stats; + try (CloseableIterable recordIterator = + table.newPartitionStatisticsScan().scan()) { stats = Lists.newArrayList(recordIterator); } diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index 3aabd635bb69..b55a472e005f 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -32,13 +32,9 @@ import java.util.stream.LongStream; import org.apache.iceberg.CatalogProperties; import org.apache.iceberg.EnvironmentContext; -import org.apache.iceberg.Files; import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatistics; import org.apache.iceberg.PartitionStatsHandler; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.TableProperties; @@ -154,33 +150,36 @@ public void testPartitionStatsIncrementalCompute() throws IOException { insertData(10); Table table = validationCatalog.loadTable(tableIdent); - PartitionStatisticsFile statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table); - table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); - - Schema dataSchema = PartitionStatsHandler.schema(Partitioning.partitionType(table), 2); - List statsBeforeCompaction; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, Files.localInput(statisticsFile.path()))) { + table + .updatePartitionStatistics() + .setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(table)) + .commit(); + + List statsBeforeCompaction; + try (CloseableIterable recordIterator = + table.newPartitionStatisticsScan().scan()) { statsBeforeCompaction = Lists.newArrayList(recordIterator); } sql("CALL %s.system.rewrite_data_files(table => '%s')", catalogName, tableIdent); table.refresh(); - statisticsFile = - PartitionStatsHandler.computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId()); - table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); - List statsAfterCompaction; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, Files.localInput(statisticsFile.path()))) { + table + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.computeAndWriteStatsFile( + table, table.currentSnapshot().snapshotId())) + .commit(); + + List statsAfterCompaction; + try (CloseableIterable recordIterator = + table.newPartitionStatisticsScan().scan()) { statsAfterCompaction = Lists.newArrayList(recordIterator); } for (int index = 0; index < statsBeforeCompaction.size(); index++) { - PartitionStats statsAfter = statsAfterCompaction.get(index); - PartitionStats statsBefore = statsBeforeCompaction.get(index); + PartitionStatistics statsAfter = statsAfterCompaction.get(index); + PartitionStatistics statsBefore = statsBeforeCompaction.get(index); assertThat(statsAfter.partition()).isEqualTo(statsBefore.partition()); // data count should match after compaction diff --git a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java index b8dca4b2cd18..fa3ff37186cc 100644 --- a/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java +++ b/spark/v4.0/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteManifestsProcedure.java @@ -26,13 +26,9 @@ import java.sql.Date; import java.sql.Timestamp; import java.util.List; -import org.apache.iceberg.Files; import org.apache.iceberg.ParameterizedTestExtension; -import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStats; +import org.apache.iceberg.PartitionStatistics; import org.apache.iceberg.PartitionStatsHandler; -import org.apache.iceberg.Partitioning; -import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; @@ -408,14 +404,14 @@ public void testPartitionStatsIncrementalCompute() throws IOException { sql("INSERT INTO TABLE %s VALUES (2, 'b')", tableName); Table table = validationCatalog.loadTable(tableIdent); - PartitionStatisticsFile statisticsFile = PartitionStatsHandler.computeAndWriteStatsFile(table); - table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); - - Schema dataSchema = PartitionStatsHandler.schema(Partitioning.partitionType(table), 2); - List statsBeforeRewrite; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, Files.localInput(statisticsFile.path()))) { + table + .updatePartitionStatistics() + .setPartitionStatistics(PartitionStatsHandler.computeAndWriteStatsFile(table)) + .commit(); + + List statsBeforeRewrite; + try (CloseableIterable recordIterator = + table.newPartitionStatisticsScan().scan()) { statsBeforeRewrite = Lists.newArrayList(recordIterator); } @@ -424,19 +420,22 @@ public void testPartitionStatsIncrementalCompute() throws IOException { catalogName, tableIdent); table.refresh(); - statisticsFile = - PartitionStatsHandler.computeAndWriteStatsFile(table, table.currentSnapshot().snapshotId()); - table.updatePartitionStatistics().setPartitionStatistics(statisticsFile).commit(); - List statsAfterRewrite; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - dataSchema, Files.localInput(statisticsFile.path()))) { + table + .updatePartitionStatistics() + .setPartitionStatistics( + PartitionStatsHandler.computeAndWriteStatsFile( + table, table.currentSnapshot().snapshotId())) + .commit(); + + List statsAfterRewrite; + try (CloseableIterable recordIterator = + table.newPartitionStatisticsScan().scan()) { statsAfterRewrite = Lists.newArrayList(recordIterator); } for (int index = 0; index < statsBeforeRewrite.size(); index++) { - PartitionStats statsAfter = statsAfterRewrite.get(index); - PartitionStats statsBefore = statsBeforeRewrite.get(index); + PartitionStatistics statsAfter = statsAfterRewrite.get(index); + PartitionStatistics statsBefore = statsBeforeRewrite.get(index); assertThat(statsAfter.partition()).isEqualTo(statsBefore.partition()); // data count should match diff --git a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java index 303411eb7ddd..97e625214976 100644 --- a/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java +++ b/spark/v4.0/spark/src/test/java/org/apache/iceberg/spark/actions/TestComputePartitionStatsAction.java @@ -23,9 +23,8 @@ import java.io.IOException; import java.util.List; -import org.apache.iceberg.Files; +import org.apache.iceberg.PartitionStatistics; import org.apache.iceberg.PartitionStatisticsFile; -import org.apache.iceberg.PartitionStats; import org.apache.iceberg.PartitionStatsHandler; import org.apache.iceberg.Partitioning; import org.apache.iceberg.Schema; @@ -122,7 +121,8 @@ public void partitionStatsComputeOnLatestSnapshot() throws IOException { Types.StructType partitionType = Partitioning.partitionType(table); Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2); validatePartitionStats( - statisticsFile, + table, + table.currentSnapshot().snapshotId(), dataSchema, Tuple.tuple( partitionRecord(partitionType, "foo", "A"), @@ -212,7 +212,8 @@ public void partitionStatsComputeOnSnapshot() throws IOException { Schema dataSchema = PartitionStatsHandler.schema(partitionType, 2); // should contain stats for only partitions of snapshot1 (no entry for partition bar, A) validatePartitionStats( - statisticsFile, + table, + snapshot1.snapshotId(), dataSchema, Tuple.tuple( partitionRecord(partitionType, "foo", "A"), @@ -273,30 +274,29 @@ private void createPartitionedTableV1() { } private void validatePartitionStats( - PartitionStatisticsFile result, Schema recordSchema, Tuple... expectedValues) + Table table, long snapshotId, Schema recordSchema, Tuple... expectedValues) throws IOException { // read the partition entries from the stats file - List partitionStats; - try (CloseableIterable recordIterator = - PartitionStatsHandler.readPartitionStatsFile( - recordSchema, Files.localInput(result.path()))) { + List partitionStats; + try (CloseableIterable recordIterator = + table.newPartitionStatisticsScan().useSnapshot(snapshotId).scan()) { partitionStats = Lists.newArrayList(recordIterator); } assertThat(partitionStats) .extracting( - PartitionStats::partition, - PartitionStats::specId, - PartitionStats::dataRecordCount, - PartitionStats::dataFileCount, - PartitionStats::totalDataFileSizeInBytes, - PartitionStats::positionDeleteRecordCount, - PartitionStats::positionDeleteFileCount, - PartitionStats::equalityDeleteRecordCount, - PartitionStats::equalityDeleteFileCount, - PartitionStats::totalRecords, - PartitionStats::lastUpdatedAt, - PartitionStats::lastUpdatedSnapshotId) + PartitionStatistics::partition, + PartitionStatistics::specId, + PartitionStatistics::dataRecordCount, + PartitionStatistics::dataFileCount, + PartitionStatistics::totalDataFileSizeInBytes, + PartitionStatistics::positionDeleteRecordCount, + PartitionStatistics::positionDeleteFileCount, + PartitionStatistics::equalityDeleteRecordCount, + PartitionStatistics::equalityDeleteFileCount, + PartitionStatistics::totalRecords, + PartitionStatistics::lastUpdatedAt, + PartitionStatistics::lastUpdatedSnapshotId) .containsExactlyInAnyOrder(expectedValues); }