Skip to content

Commit

Permalink
[cdc] Fix cdc job mistakenly changes immutable options of existing ta…
Browse files Browse the repository at this point in the history
…ble (apache#3095)
  • Loading branch information
zhuangchong authored Mar 27, 2024
1 parent 6f02230 commit 3237e1a
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;

import static org.apache.paimon.CoreOptions.TagCreationMode.WATERMARK;
Expand Down Expand Up @@ -178,11 +179,23 @@ protected abstract void buildSink(

protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable table) {
// doesn't support altering bucket here
Map<String, String> withoutBucket = new HashMap<>(tableConfig);
withoutBucket.remove(CoreOptions.BUCKET.key());

Map<String, String> dynamicOptions = new HashMap<>(tableConfig);
dynamicOptions.remove(CoreOptions.BUCKET.key());

// remove immutable options and options with equal values
Map<String, String> oldOptions = table.options();
Set<String> immutableOptionKeys = CoreOptions.getImmutableOptionKeys();
dynamicOptions
.entrySet()
.removeIf(
entry ->
immutableOptionKeys.contains(entry.getKey())
|| Objects.equals(
oldOptions.get(entry.getKey()), entry.getValue()));

// alter the table dynamic options
List<SchemaChange> optionChanges =
withoutBucket.entrySet().stream()
dynamicOptions.entrySet().stream()
.map(entry -> SchemaChange.setOption(entry.getKey(), entry.getValue()))
.collect(Collectors.toList());

Expand All @@ -194,7 +207,7 @@ protected FileStoreTable alterTableOptions(Identifier identifier, FileStoreTable
throw new RuntimeException("This is unexpected.", e);
}

return table.copy(withoutBucket);
return table.copy(dynamicOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1097,6 +1097,49 @@ public void testOptionsChange() throws Exception {
assertThat(table.options()).containsAllEntriesOf(tableConfig);
}

@Test
public void testOptionsChangeInExistingTable() throws Exception {
Map<String, String> options = new HashMap<>();
options.put("bucket", "1");
options.put("sink.parallelism", "1");
options.put("sequence.field", "_timestamp");

createFileStoreTable(
RowType.of(
new DataType[] {
DataTypes.INT().notNull(), DataTypes.DATE(), DataTypes.TIMESTAMP(0)
},
new String[] {"pk", "_date", "_timestamp"}),
Collections.emptyList(),
Collections.singletonList("pk"),
options);

Map<String, String> mySqlConfig = getBasicMySqlConfig();
mySqlConfig.put("database-name", DATABASE_NAME);
mySqlConfig.put("table-name", "test_exist_options_change");
Map<String, String> tableConfig = new HashMap<>();
// update immutable options
tableConfig.put("sequence.field", "_date");
// update existing options
tableConfig.put("sink.parallelism", "2");
// add new options
tableConfig.put("snapshot.expire.limit", "1000");

MySqlSyncTableAction action =
syncTableActionBuilder(mySqlConfig)
.withPrimaryKeys("pk")
.withTableConfig(tableConfig)
.build();
runActionWithDefaultEnv(action);

FileStoreTable table = getFileStoreTable();

assertThat(table.options().get("bucket")).isEqualTo("1");
assertThat(table.options().get("sequence.field")).isEqualTo("_timestamp");
assertThat(table.options().get("sink.parallelism")).isEqualTo("2");
assertThat(table.options().get("snapshot.expire.limit")).isEqualTo("1000");
}

@Test
@Timeout(60)
public void testMetadataColumns() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,13 @@ CREATE TABLE test_options_change (
PRIMARY KEY (pk)
);

CREATE TABLE test_exist_options_change (
pk INT,
_date DATE,
_timestamp TIMESTAMP,
PRIMARY KEY (pk)
);

-- ################################################################################
-- testSyncShard
-- ################################################################################
Expand Down Expand Up @@ -405,4 +412,4 @@ USE invalid_alter_bucket;

CREATE TABLE t (
k INT PRIMARY KEY
);
);

0 comments on commit 3237e1a

Please sign in to comment.