Skip to content

Commit

Permalink
add cdc test
Browse files Browse the repository at this point in the history
  • Loading branch information
sunxiaojian committed Mar 15, 2024
1 parent f185fa2 commit 0bf48ab
Show file tree
Hide file tree
Showing 14 changed files with 372 additions and 650 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,36 +79,31 @@ public Path snapshotDirectory() {
}

public Path snapshotDirectory(String branchName) {
return new Path(getBranchPath(tablePath, branchName) + "/snapshot");
}

public Path snapshotDirectoryByBranch(String branchName) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotDirectory()
: snapshotDirectory(branchName);
: new Path(getBranchPath(tablePath, branchName) + "/snapshot");
}

public Path snapshotPath(long snapshotId) {
return new Path(tablePath + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}

public Path snapshotPath(String branchName, long snapshotId) {
return new Path(
getBranchPath(tablePath, branchName) + "/snapshot/" + SNAPSHOT_PREFIX + snapshotId);
}

public Path snapshotPathByBranch(String branchName, long snapshotId) {
return branchName.equals(DEFAULT_MAIN_BRANCH)
? snapshotPath(snapshotId)
: snapshotPath(branchName, snapshotId);
: new Path(
getBranchPath(tablePath, branchName)
+ "/snapshot/"
+ SNAPSHOT_PREFIX
+ snapshotId);
}

public Snapshot snapshot(long snapshotId) {
return snapshot(DEFAULT_MAIN_BRANCH, snapshotId);
}

public Snapshot snapshot(String branchName, long snapshotId) {
Path snapshotPath = snapshotPathByBranch(branchName, snapshotId);
Path snapshotPath = snapshotPath(branchName, snapshotId);
return Snapshot.fromPath(fileIO, snapshotPath);
}

Expand All @@ -117,7 +112,7 @@ public boolean snapshotExists(long snapshotId) {
}

public boolean snapshotExists(String branchName, long snapshotId) {
Path path = snapshotPathByBranch(branchName, snapshotId);
Path path = snapshotPath(branchName, snapshotId);
try {
return fileIO.exists(path);
} catch (IOException e) {
Expand Down Expand Up @@ -265,8 +260,7 @@ public long snapshotCount() throws IOException {
}

public long snapshotCount(String branch) throws IOException {
return listVersionedFiles(fileIO, snapshotDirectoryByBranch(branch), SNAPSHOT_PREFIX)
.count();
return listVersionedFiles(fileIO, snapshotDirectory(branch), SNAPSHOT_PREFIX).count();
}

public Iterator<Snapshot> snapshots() throws IOException {
Expand Down Expand Up @@ -429,7 +423,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

private @Nullable Long findLatest(String branchName) throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
Expand All @@ -447,7 +441,7 @@ public Snapshot traversalSnapshotsFromLatestSafely(Filter<Snapshot> checker) {
}

private @Nullable Long findEarliest(String branchName) throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
if (!fileIO.exists(snapshotDir)) {
return null;
}
Expand All @@ -466,7 +460,7 @@ public Long readHint(String fileName) {
}

public Long readHint(String fileName, String branchName) {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path path = new Path(snapshotDir, fileName);
int retryNumber = 0;
while (retryNumber++ < READ_HINT_RETRY_NUM) {
Expand All @@ -486,7 +480,7 @@ public Long readHint(String fileName, String branchName) {

private Long findByListFiles(BinaryOperator<Long> reducer, String branchName)
throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
return listVersionedFiles(fileIO, snapshotDir, SNAPSHOT_PREFIX)
.reduce(reducer)
.orElse(null);
Expand All @@ -510,7 +504,7 @@ public void commitEarliestHint(long snapshotId, String branchName) throws IOExce

private void commitHint(long snapshotId, String fileName, String branchName)
throws IOException {
Path snapshotDir = snapshotDirectoryByBranch(branchName);
Path snapshotDir = snapshotDirectory(branchName);
Path hintFile = new Path(snapshotDir, fileName);
fileIO.overwriteFileUtf8(hintFile, String.valueOf(snapshotId));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@

package org.apache.paimon.flink.action.cdc;

import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.action.ActionBase;
import org.apache.paimon.flink.action.ActionITCaseBase;
import org.apache.paimon.flink.action.cdc.kafka.KafkaSyncDatabaseActionFactory;
Expand All @@ -32,14 +30,11 @@
import org.apache.paimon.flink.action.cdc.postgres.PostgresSyncTableActionFactory;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncDatabaseActionFactory;
import org.apache.paimon.flink.action.cdc.pulsar.PulsarSyncTableActionFactory;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.BranchManager;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
Expand All @@ -61,7 +56,6 @@
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.options.CatalogOptions.BRANCH;
import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
import static org.assertj.core.api.Assertions.assertThat;

/** CDC IT case base. */
Expand Down Expand Up @@ -89,22 +83,6 @@ protected void waitingTables(String... tables) throws Exception {
waitingTables(Arrays.asList(tables));
}

protected void waitingTables(List<String> tables, String branch) throws Exception {
LOG.info("Waiting for tables '{}'", tables);
Map<String, String> options = new HashMap<>();
options.put(WAREHOUSE.key(), new Path(warehouse).toUri().toString());
options.put(BRANCH.key(), branch);
Catalog catalogBranch =
CatalogFactory.createCatalog(CatalogContext.create(Options.fromMap(options)));
while (true) {
List<String> actualTables = catalogBranch.listTables(database);
if (actualTables.containsAll(tables)) {
break;
}
Thread.sleep(100);
}
}

protected void waitingTables(List<String> tables) throws Exception {
LOG.info("Waiting for tables '{}'", tables);

Expand Down Expand Up @@ -136,16 +114,6 @@ protected void assertTableNotExists(String... tableNames) throws Exception {
protected void waitForResult(
List<String> expected, FileStoreTable table, RowType rowType, List<String> primaryKeys)
throws Exception {
waitForResult(expected, table, rowType, primaryKeys, BranchManager.DEFAULT_MAIN_BRANCH);
}

protected void waitForResult(
List<String> expected,
FileStoreTable table,
RowType rowType,
List<String> primaryKeys,
String branch)
throws Exception {
assertThat(table.schema().primaryKeys()).isEqualTo(primaryKeys);

// wait for table schema to become our expected schema
Expand Down Expand Up @@ -188,6 +156,16 @@ protected void waitForResult(
}
}

protected Map<String, String> getCatalogOptions(Map<String, String> catalogOptions) {
catalogOptions.put(BRANCH.key(), branch);
return catalogOptions;
}

protected Map<String, String> getTableConfig(Map<String, String> tableConfig) {
tableConfig.put(FlinkConnectorOptions.BRANCH.key(), branch);
return tableConfig;
}

protected Map<String, String> getBasicTableConfig() {
Map<String, String> config = new HashMap<>();
ThreadLocalRandom random = ThreadLocalRandom.current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ public void testSchemaEvolutionOneTopic() throws Exception {
testSchemaEvolutionOneTopic(DEBEZIUM);
}

@Test
@Timeout(240)
public void testSchemaEvolutionOneTopicWithBranch() throws Exception {
testSchemaEvolutionOneTopicWithBranch(DEBEZIUM, "testBranch");
}

@Test
public void testTopicIsEmpty() {
testTopicIsEmpty(DEBEZIUM);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;

/** IT cases for {@link KafkaSyncDatabaseAction}. */
public class KafkaDebeziumSyncDatabaseToBranchActionITCase
extends KafkaDebeziumSyncDatabaseActionITCase {
@BeforeEach
public void before() throws IOException {
branch = "testKafkaDebeziumSyncDatabaseBranch";
super.before();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,6 @@ public void testSchemaEvolution() throws Exception {
runSingleTableSchemaEvolution("schemaevolution", DEBEZIUM);
}

@Test
@Timeout(120)
public void testSchemaEvolutionToBranch() throws Exception {
runSingleTableSchemaEvolutionToBranch("schemaevolution", DEBEZIUM, "testBranch");
}

@Test
@Timeout(60)
public void testNotSupportFormat() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.flink.action.cdc.kafka;

import org.junit.jupiter.api.BeforeEach;

import java.io.IOException;

/** IT cases for {@link KafkaSyncTableAction}. */
public class KafkaDebeziumSyncTableToBranchActionITCase extends KafkaDebeziumSyncTableActionITCase {

@BeforeEach
public void before() throws IOException {
branch = "testKafkaDebeziumSyncTableBranch";
super.before();
}
}
Loading

0 comments on commit 0bf48ab

Please sign in to comment.