Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark 3.5: Add ignore-invalid-options to RewriteDataFilesSparkAction and RewritePositionDeleteFilesSparkAction #11737

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public interface RewriteDataFiles

boolean REMOVE_DANGLING_DELETES_DEFAULT = false;

/**
* If set to true, the rewrite operation will ignore invalid options.
*
* <p>Defaults to false.
*/
String IGNORE_INVALID_OPTIONS = "ignore-invalid-options";

boolean IGNORE_INVALID_OPTIONS_DEFAULT = false;

/**
* Forces the rewrite job order based on the value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,15 @@ public interface RewritePositionDeleteFiles

String REWRITE_JOB_ORDER_DEFAULT = RewriteJobOrder.NONE.orderName();

/**
* If set to true, the rewrite operation will ignore invalid options.
*
* <p>Defaults to false.
*/
String IGNORE_INVALID_OPTIONS = "ignore-invalid-options";

boolean IGNORE_INVALID_OPTIONS_DEFAULT = false;

/**
* A filter for finding deletes to rewrite.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ public class RewriteDataFilesSparkAction
USE_STARTING_SEQUENCE_NUMBER,
REWRITE_JOB_ORDER,
OUTPUT_SPEC_ID,
REMOVE_DANGLING_DELETES);
REMOVE_DANGLING_DELETES,
IGNORE_INVALID_OPTIONS);

private static final RewriteDataFilesSparkAction.Result EMPTY_RESULT =
ImmutableRewriteDataFiles.Result.builder().rewriteResults(ImmutableList.of()).build();
Expand Down Expand Up @@ -435,17 +436,22 @@ private Iterable<FileGroupRewriteResult> toRewriteResults(List<RewriteFileGroup>
}

void validateAndInitOptions() {
Set<String> validOptions = Sets.newHashSet(rewriter.validOptions());
validOptions.addAll(VALID_OPTIONS);

Set<String> invalidKeys = Sets.newHashSet(options().keySet());
invalidKeys.removeAll(validOptions);

Preconditions.checkArgument(
invalidKeys.isEmpty(),
"Cannot use options %s, they are not supported by the action or the rewriter %s",
invalidKeys,
rewriter.description());
boolean ignoreInvalidOptions =
PropertyUtil.propertyAsBoolean(
options(), IGNORE_INVALID_OPTIONS, IGNORE_INVALID_OPTIONS_DEFAULT);
if (!ignoreInvalidOptions) {
Set<String> validOptions = Sets.newHashSet(rewriter.validOptions());
validOptions.addAll(VALID_OPTIONS);

Set<String> invalidKeys = Sets.newHashSet(options().keySet());
invalidKeys.removeAll(validOptions);

Preconditions.checkArgument(
invalidKeys.isEmpty(),
"Cannot use options %s, they are not supported by the action or the rewriter %s",
invalidKeys,
rewriter.description());
}

rewriter.init(options());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public class RewritePositionDeleteFilesSparkAction
MAX_CONCURRENT_FILE_GROUP_REWRITES,
PARTIAL_PROGRESS_ENABLED,
PARTIAL_PROGRESS_MAX_COMMITS,
REWRITE_JOB_ORDER);
REWRITE_JOB_ORDER,
IGNORE_INVALID_OPTIONS);
private static final Result EMPTY_RESULT =
ImmutableRewritePositionDeleteFiles.Result.builder().build();

Expand Down Expand Up @@ -358,17 +359,22 @@ private RewritePositionDeletesGroup newRewriteGroup(
}

private void validateAndInitOptions() {
Set<String> validOptions = Sets.newHashSet(rewriter.validOptions());
validOptions.addAll(VALID_OPTIONS);

Set<String> invalidKeys = Sets.newHashSet(options().keySet());
invalidKeys.removeAll(validOptions);

Preconditions.checkArgument(
invalidKeys.isEmpty(),
"Cannot use options %s, they are not supported by the action or the rewriter %s",
invalidKeys,
rewriter.description());
boolean ignoreInvalidOptions =
PropertyUtil.propertyAsBoolean(
options(), IGNORE_INVALID_OPTIONS, IGNORE_INVALID_OPTIONS_DEFAULT);
if (!ignoreInvalidOptions) {
Set<String> validOptions = Sets.newHashSet(rewriter.validOptions());
validOptions.addAll(VALID_OPTIONS);

Set<String> invalidKeys = Sets.newHashSet(options().keySet());
invalidKeys.removeAll(validOptions);

Preconditions.checkArgument(
invalidKeys.isEmpty(),
"Cannot use options %s, they are not supported by the action or the rewriter %s",
invalidKeys,
rewriter.description());
}

rewriter.init(options());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1214,6 +1214,15 @@ public void testInvalidOptions() {
.hasMessageContaining("requires enabling Iceberg Spark session extensions");
}

@TestTemplate
public void testIgnoreInvalidOptions() {
Table table = createTable(20);
basicRewrite(table)
.option("ignore-invalid-options", "true")
.option("foobarity", "-5")
.execute();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test the default behavior of ignore-invalid-options, similar to what you did in TestRewritePositionDeleteFilesAction.testIgnoreInvalidOptions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's covered by testInvalidOptions() above.

}

@TestTemplate
public void testSortMultipleGroups() {
Table table = createTable(20);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -728,6 +728,27 @@ public void testRewriteManyColumns() throws Exception {
assertEquals("Position deletes must match", expectedDeletes, actualDeletes);
}

@TestTemplate
public void testIgnoreInvalidOptions() {
Table table = createTableUnpartitioned(2, SCALE);
assertThatThrownBy(
() -> {
SparkActions.get(spark)
.rewritePositionDeletes(table)
.option("foobarity", "-5")
.execute();
})
.isInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Cannot use options [foobarity], they are not supported by the action or the rewriter BIN-PACK");

SparkActions.get(spark)
.rewritePositionDeletes(table)
.option("ignore-invalid-options", "true")
.option("foobarity", "-5")
.execute();
}

private Table createTablePartitioned(int partitions, int files, int numRecords) {
PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("c1").build();
Table table =
Expand Down