diff --git a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index fd8754b30d5b..f1d4a9e733f8 100644 --- a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -41,6 +41,7 @@ import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.parser.ParseException; +import org.apache.spark.sql.internal.SQLConf; import org.junit.After; import org.junit.Assert; import org.junit.Assume; @@ -61,6 +62,29 @@ public void removeTable() { sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); } + @Test + public void testFilterCaseSensitivity() { + createTable(); + insertData(10); + sql("set %s = false", SQLConf.CASE_SENSITIVE().key()); + List expectedRecords = currentData(); + List output = + sql( + "CALL %s.system.rewrite_data_files(table=>'%s', where=>'C1 > 0')", + catalogName, tableIdent); + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(3); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + @Test public void testZOrderSortExpression() { List order = diff --git a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index eed0b2b67b0a..73aa54ffc8a8 100644 --- a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -59,6 +59,7 @@ import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.StructLikeMap; @@ -94,11 +95,13 @@ public class RewriteDataFilesSparkAction private boolean useStartingSequenceNumber; private RewriteJobOrder rewriteJobOrder; private FileRewriter rewriter = null; + private boolean caseSensitive; RewriteDataFilesSparkAction(SparkSession spark, Table table) { super(spark.cloneSession()); // Disable Adaptive Query Execution as this may change the output partitioning of our write spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + this.caseSensitive = SparkUtil.caseSensitive(spark); this.table = table; } @@ -183,6 +186,7 @@ StructLikeMap>> planFileGroups(long startingSnapshotId) table .newScan() .useSnapshot(startingSnapshotId) + .caseSensitive(caseSensitive) .filter(filter) .ignoreResiduals() .planFiles(); diff --git a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java index e637950ae5d4..7c739fc8f61d 100644 --- a/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java +++ b/spark/v3.4/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestRewriteDataFilesProcedure.java @@ -70,6 +70,29 @@ public void removeTable() { sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME)); } + @Test + public void testFilterCaseSensitivity() { + createTable(); + insertData(10); + sql("set %s = false", SQLConf.CASE_SENSITIVE().key()); + List expectedRecords = currentData(); + List output = + sql( + "CALL %s.system.rewrite_data_files(table=>'%s', where=>'C1 > 0')", + catalogName, tableIdent); + assertEquals( + "Action should rewrite 10 data files and add 1 data files", + row(10, 1), + Arrays.copyOf(output.get(0), 2)); + // verify rewritten bytes separately + assertThat(output.get(0)).hasSize(4); + assertThat(output.get(0)[2]) + .isInstanceOf(Long.class) + .isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP))); + List actualRecords = currentData(); + assertEquals("Data after compaction should not change", expectedRecords, actualRecords); + } + @Test public void testZOrderSortExpression() { List order = diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java index 0b2bbb3dfc39..ce0808da50b8 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/actions/RewriteDataFilesSparkAction.java @@ -62,6 +62,7 @@ import org.apache.iceberg.relocated.com.google.common.math.IntMath; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Types.StructType; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.StructLikeMap; @@ -100,11 +101,13 @@ public class RewriteDataFilesSparkAction private boolean useStartingSequenceNumber; private RewriteJobOrder rewriteJobOrder; private FileRewriter rewriter = null; + private boolean caseSensitive; RewriteDataFilesSparkAction(SparkSession spark, Table table) { super(spark.cloneSession()); // Disable Adaptive Query Execution as this may change the output partitioning of our write spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false); + this.caseSensitive = SparkUtil.caseSensitive(spark); this.table = table; } @@ -195,6 +198,7 @@ StructLikeMap>> planFileGroups(long startingSnapshotId) table .newScan() .useSnapshot(startingSnapshotId) + .caseSensitive(caseSensitive) .filter(filter) .ignoreResiduals() .planFiles();