Skip to content

Commit

Permalink
add cdc write to branch test
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 14, 2024
1 parent a5a79f6 commit 1030872
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,8 @@ public TableSchema commitChanges(String branchName, List<SchemaChange> changes)
Catalog.ColumnNotExistException {
while (true) {
TableSchema schema =
latest().orElseThrow(
latest(branchName)
.orElseThrow(
() ->
new Catalog.TableNotExistException(
fromPath(tableRoot.toString(), true)));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand Down Expand Up @@ -112,6 +113,16 @@ protected void assertTableNotExists(String... tableNames) throws Exception {
protected void waitForResult(
List<String> expected, FileStoreTable table, RowType rowType, List<String> primaryKeys)
throws Exception {
waitForResult(expected, table, rowType, primaryKeys, BranchManager.DEFAULT_MAIN_BRANCH);
}

protected void waitForResult(
List<String> expected,
FileStoreTable table,
RowType rowType,
List<String> primaryKeys,
String branch)
throws Exception {
assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);

// wait for table schema to become our expected schema
Expand All @@ -130,15 +141,15 @@ protected void waitForResult(
break;
}
}
table = table.copyWithLatestSchema();
table = table.copyWithLatestSchema(branch);
Thread.sleep(1000);
}

// wait for data to become expected
List<String> sortedExpected = new ArrayList<>(expected);
Collections.sort(sortedExpected);
while (true) {
ReadBuilder readBuilder = table.newReadBuilder();
ReadBuilder readBuilder = table.newReadBuilder().fromBranch(branch);
TableScan.Plan plan = readBuilder.newScan().plan();
List<String> result =
getResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ public void testSchemaEvolution() throws Exception {
runSingleTableSchemaEvolution("schemaevolution", DEBEZIUM);
}

@Test
@Timeout(120)
public void testSchemaEvolutionToBranch() throws Exception {
runSingleTableSchemaEvolutionToBranch("schemaevolution", DEBEZIUM, "testBranch");
}

@Test
@Timeout(60)
public void testNotSupportFormat() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc.kafka;

import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.cdc.MessageQueueSchemaUtils;
import org.apache.paimon.flink.action.cdc.TypeMapping;
import org.apache.paimon.schema.Schema;
Expand All @@ -33,6 +34,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

Expand Down Expand Up @@ -81,6 +83,115 @@ protected void runSingleTableSchemaEvolution(String sourceDir, String format) th
testSchemaEvolutionImpl(topic, sourceDir, format);
}

protected void runSingleTableSchemaEvolutionToBranch(
String sourceDir, String format, String branch) throws Exception {
final String topic = "schema_evolution-branch";
createTestTopic(topic, 1, 1);
// ---------- Write the data into Kafka -------------------
List<String> lines =
readLines(
String.format(
"kafka/%s/table/%s/%s-data-1.txt", format, sourceDir, format));
try {
writeRecordsToKafka(topic, lines);
} catch (Exception e) {
throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
}
Map<String, String> kafkaConfig = getBasicKafkaConfig();
kafkaConfig.put(VALUE_FORMAT.key(), format + "-json");
kafkaConfig.put(TOPIC.key(), topic);
Map<String, String> tableConfig = getBasicTableConfig();
tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
Map<String, String> catalogConfig = new HashMap<>();
catalogConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
KafkaSyncTableAction action =
syncTableActionBuilder(kafkaConfig)
.withPrimaryKeys("id")
.withTableConfig(tableConfig)
.withCatalogConfig(catalogConfig)
.build();
runActionWithDefaultEnv(action);

testSchemaEvolutionImplWithBranch(topic, sourceDir, format, branch);
}

private void testSchemaEvolutionImplWithBranch(
String topic, String sourceDir, String format, String branch) throws Exception {
FileStoreTable table = getFileStoreTableWithBranch(tableName, branch);

RowType rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"id", "name", "description", "weight"});
List<String> primaryKeys = Collections.singletonList("id");
List<String> expected =
Arrays.asList(
"+I[101, scooter, Small 2-wheel scooter, 3.14]",
"+I[102, car battery, 12V car battery, 8.1]");
waitForResult(expected, table, rowType, primaryKeys, branch);

try {
writeRecordsToKafka(
topic,
readLines(
String.format(
"kafka/%s/table/%s/%s-data-2.txt", format, sourceDir, format)));
} catch (Exception e) {
throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
}
rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"id", "name", "description", "weight", "age"});
expected =
Arrays.asList(
"+I[101, scooter, Small 2-wheel scooter, 3.14, NULL]",
"+I[102, car battery, 12V car battery, 8.1, NULL]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18]",
"+I[104, hammer, 12oz carpenter's hammer, 0.75, 24]");
waitForResult(expected, table, rowType, primaryKeys, branch);

try {
writeRecordsToKafka(
topic,
readLines(
String.format(
"kafka/%s/table/%s/%s-data-3.txt", format, sourceDir, format)));
} catch (Exception e) {
throw new Exception(String.format("Failed to write %s data to Kafka.", format), e);
}
rowType =
RowType.of(
new DataType[] {
DataTypes.STRING().notNull(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING(),
DataTypes.STRING()
},
new String[] {"id", "name", "description", "weight", "age", "address"});
expected =
Arrays.asList(
"+I[102, car battery, 12V car battery, 8.1, NULL, NULL]",
"+I[103, 12-pack drill bits, 12-pack of drill bits with sizes ranging from #40 to #3, 0.8, 18, NULL]",
"+I[104, hammer, 12oz carpenter's hammer, 0.75, 24, NULL]",
"+I[105, hammer, 14oz carpenter's hammer, 0.875, NULL, Beijing]",
"+I[107, rocks, box of assorted rocks, 5.3, NULL, NULL]");
waitForResult(expected, table, rowType, primaryKeys, branch);
}

private void testSchemaEvolutionImpl(String topic, String sourceDir, String format)
throws Exception {
FileStoreTable table = getFileStoreTable(tableName);
Expand Down
Loading

0 comments on commit 1030872

Please sign in to comment.