Skip to content

Commit

Permalink
Spark 3.3, 3.4: Make where clause case sensitive in rewrite data files (
Browse files Browse the repository at this point in the history
  • Loading branch information
ludlows authored Dec 4, 2024
1 parent c7cef9b commit 3278b69
Show file tree
Hide file tree
Showing 4 changed files with 55 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.parser.ParseException;
import org.apache.spark.sql.internal.SQLConf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
Expand All @@ -61,6 +62,29 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
}

@Test
public void testFilterCaseSensitivity() {
createTable();
insertData(10);
sql("set %s = false", SQLConf.CASE_SENSITIVE().key());
List<Object[]> expectedRecords = currentData();
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table=>'%s', where=>'C1 > 0')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(3);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testZOrderSortExpression() {
List<ExtendedParser.RawOrderField> order =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
Expand Down Expand Up @@ -94,11 +95,13 @@ public class RewriteDataFilesSparkAction
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private FileRewriter<FileScanTask, DataFile> rewriter = null;
private boolean caseSensitive;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark.cloneSession());
// Disable Adaptive Query Execution as this may change the output partitioning of our write
spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
this.caseSensitive = SparkUtil.caseSensitive(spark);
this.table = table;
}

Expand Down Expand Up @@ -183,6 +186,7 @@ StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId)
table
.newScan()
.useSnapshot(startingSnapshotId)
.caseSensitive(caseSensitive)
.filter(filter)
.ignoreResiduals()
.planFiles();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ public void removeTable() {
sql("DROP TABLE IF EXISTS %s", tableName(QUOTED_SPECIAL_CHARS_TABLE_NAME));
}

@Test
public void testFilterCaseSensitivity() {
createTable();
insertData(10);
sql("set %s = false", SQLConf.CASE_SENSITIVE().key());
List<Object[]> expectedRecords = currentData();
List<Object[]> output =
sql(
"CALL %s.system.rewrite_data_files(table=>'%s', where=>'C1 > 0')",
catalogName, tableIdent);
assertEquals(
"Action should rewrite 10 data files and add 1 data files",
row(10, 1),
Arrays.copyOf(output.get(0), 2));
// verify rewritten bytes separately
assertThat(output.get(0)).hasSize(4);
assertThat(output.get(0)[2])
.isInstanceOf(Long.class)
.isEqualTo(Long.valueOf(snapshotSummary().get(SnapshotSummary.REMOVED_FILE_SIZE_PROP)));
List<Object[]> actualRecords = currentData();
assertEquals("Data after compaction should not change", expectedRecords, actualRecords);
}

@Test
public void testZOrderSortExpression() {
List<ExtendedParser.RawOrderField> order =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.iceberg.relocated.com.google.common.math.IntMath;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors;
import org.apache.iceberg.relocated.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.iceberg.spark.SparkUtil;
import org.apache.iceberg.types.Types.StructType;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.StructLikeMap;
Expand Down Expand Up @@ -100,11 +101,13 @@ public class RewriteDataFilesSparkAction
private boolean useStartingSequenceNumber;
private RewriteJobOrder rewriteJobOrder;
private FileRewriter<FileScanTask, DataFile> rewriter = null;
private boolean caseSensitive;

RewriteDataFilesSparkAction(SparkSession spark, Table table) {
super(spark.cloneSession());
// Disable Adaptive Query Execution as this may change the output partitioning of our write
spark().conf().set(SQLConf.ADAPTIVE_EXECUTION_ENABLED().key(), false);
this.caseSensitive = SparkUtil.caseSensitive(spark);
this.table = table;
}

Expand Down Expand Up @@ -195,6 +198,7 @@ StructLikeMap<List<List<FileScanTask>>> planFileGroups(long startingSnapshotId)
table
.newScan()
.useSnapshot(startingSnapshotId)
.caseSensitive(caseSensitive)
.filter(filter)
.ignoreResiduals()
.planFiles();
Expand Down

0 comments on commit 3278b69

Please sign in to comment.