Skip to content

Commit

Permalink
test: add actionType and refact clinet-base and common Table class (D…
Browse files Browse the repository at this point in the history
…ataLinkDC#1588)

* test: add actionType

* style: spotless

Signed-off-by: Licho <[email protected]>

* feat: run unit test

Signed-off-by: Licho <[email protected]>

* feat: github workflow remove fast profile

* test: ignore class unit test

* test: remove invalid test from daemon

Signed-off-by: Licho <[email protected]>

* refactor: Table getFlinkDDL function, and extract FlinkCDCConfig configure constant variable.

* refactor: Table getFlinkDDL function, and extract FlinkCDCConfig configure constant variable.

* style: spotless

* style: spotless

* fix: compatible for jdk8

* fix: Column need default construct

* refactor: transfer log language

* refactor: simply code

---------

Signed-off-by: Licho <[email protected]>
  • Loading branch information
leechor authored Feb 1, 2023
1 parent 2405a9b commit 3a15a61
Show file tree
Hide file tree
Showing 38 changed files with 484 additions and 224 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/backend.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,11 @@ jobs:
- name: Build and Package
run: |
./mvnw -B clean install \
-Dmaven.test.skip=true \
-Dmaven.test.skip=false \
-Dspotless.check.skip=true \
-P prod,scala-2.12,flink-all,maven-central,fast \
-Denforcer.skip=true \
-Dmaven.javadoc.skip=true \
-P prod,scala-2.12,flink-all,maven-central \
--no-snapshot-updates
# 检查打包的大小
Expand Down
2 changes: 2 additions & 0 deletions dinky-admin/src/test/java/org/dinky/admin/AdminTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.admin;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

import cn.dev33.satoken.secure.SaSecureUtil;
Expand All @@ -30,6 +31,7 @@
* @author wenmo
* @since 2021/6/14 17:03
*/
@Ignore
public class AdminTest {

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.dinky.security;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;

/**
Expand All @@ -28,6 +29,7 @@
* @author wenmo
* @since 2023/1/14 15:59
*/
@Ignore
public class SecurityAspectTest {

@Test
Expand Down
1 change: 1 addition & 0 deletions dinky-admin/src/test/java/org/dinky/utils/DirUtilTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
* @author wenmo
* @since 2022/10/14 22:00
*/
@Ignore
public class DirUtilTest {

@Ignore
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
* @author wenmo
* @since 2022/2/23 20:18
*/
@Ignore
public class DingTalkTest {

private static Map<String, String> config = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Ignore
public class EmailSenderTest {

private static final Logger logger = LoggerFactory.getLogger(EmailSenderTest.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
*
* @date: 2022/4/2 @Description: 飞书消息发送 单元测试
*/
@Ignore
public class FeiShuSenderTest {

private static Map<String, String> feiShuConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.junit.Test;

/** WeChatSenderTest */
@Ignore
public class WeChatSenderTest {

private static Map<String, String> weChatConfig = new HashMap<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.flink.table.api.TableEnvironment;

import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class DinkyMysqlCatalogTest {

protected static String url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,10 @@
import java.util.Map;

import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

@Ignore
public class DinkyMysqlCatalogFactoryTest {

protected static String url;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,8 +350,8 @@ protected Object convertValue(Object value, LogicalType logicalType) {
@Override
public String getSinkSchemaName(Table table) {
String schemaName = table.getSchema();
if (config.getSink().containsKey("sink.db")) {
schemaName = config.getSink().get("sink.db");
if (config.getSink().containsKey(FlinkCDCConfig.SINK_DB)) {
schemaName = config.getSink().get(FlinkCDCConfig.SINK_DB);
}
return schemaName;
}
Expand All @@ -364,19 +364,19 @@ public String getSinkTableName(Table table) {
tableName = table.getSchema() + "_" + tableName;
}
}
if (config.getSink().containsKey("table.prefix")) {
tableName = config.getSink().get("table.prefix") + tableName;
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_PREFIX)) {
tableName = config.getSink().get(FlinkCDCConfig.TABLE_PREFIX) + tableName;
}
if (config.getSink().containsKey("table.suffix")) {
tableName = tableName + config.getSink().get("table.suffix");
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_SUFFIX)) {
tableName = tableName + config.getSink().get(FlinkCDCConfig.TABLE_SUFFIX);
}
if (config.getSink().containsKey("table.lower")) {
if (Boolean.valueOf(config.getSink().get("table.lower"))) {
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_LOWER)) {
if (Boolean.valueOf(config.getSink().get(FlinkCDCConfig.TABLE_LOWER))) {
tableName = tableName.toLowerCase();
}
}
if (config.getSink().containsKey("table.upper")) {
if (Boolean.valueOf(config.getSink().get("table.upper"))) {
if (config.getSink().containsKey(FlinkCDCConfig.TABLE_UPPER)) {
if (Boolean.valueOf(config.getSink().get(FlinkCDCConfig.TABLE_UPPER))) {
tableName = tableName.toUpperCase();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,11 @@
*/
public class SinkBuilderFactory {

private static final Map<String, Supplier<SinkBuilder>> SINK_BUILDER_MAP =
new HashMap<String, Supplier<SinkBuilder>>() {

{
}
};
private static final Map<String, Supplier<SinkBuilder>> SINK_BUILDER_MAP = new HashMap<>();

public static SinkBuilder buildSinkBuilder(FlinkCDCConfig config) {
if (Asserts.isNull(config) || Asserts.isNullString(config.getSink().get("connector"))) {
throw new FlinkClientException("请指定 Sink connector。");
throw new FlinkClientException("please assign Sink connector。");
}
return SINK_BUILDER_MAP
.getOrDefault(config.getSink().get("connector"), () -> new SQLSinkBuilder())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,8 +237,8 @@ public DataStreamSource build(
StreamExecutionEnvironment env,
CustomTableEnvironment customTableEnvironment,
DataStreamSource<String> dataStreamSource) {
final String timeZone = config.getSink().get("timezone");
config.getSink().remove("timezone");
final String timeZone = config.getSink().get(FlinkCDCConfig.TIMEZONE);
config.getSink().remove(FlinkCDCConfig.TIMEZONE);
if (Asserts.isNotNullString(timeZone)) {
sinkTimeZone = ZoneId.of(timeZone);
}
Expand Down
51 changes: 51 additions & 0 deletions dinky-client/dinky-client-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,57 @@
<groupId>org.dinky</groupId>
<artifactId>dinky-common</artifactId>
</dependency>
<!-- test dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-module-junit4</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.powermock</groupId>
<artifactId>powermock-api-mockito2</artifactId>
<type>jar</type>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-all</artifactId>
<type>jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testcontainers</groupId>
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
* FlinkCDCConfig
Expand All @@ -30,6 +31,14 @@
*/
public class FlinkCDCConfig {

public static final String SINK_DB = "sink.db";
public static final String AUTO_CREATE = "auto.create";
public static final String TABLE_PREFIX = "table.prefix";
public static final String TABLE_SUFFIX = "table.suffix";
public static final String TABLE_UPPER = "table.upper";
public static final String TABLE_LOWER = "table.lower";
public static final String COLUMN_REPLACE_LINE_BREAK = "column.replace.line-break";
public static final String TIMEZONE = "timezone";
private String type;
private String hostname;
private Integer port;
Expand All @@ -50,8 +59,6 @@ public class FlinkCDCConfig {
private List<Schema> schemaList;
private String schemaFieldName;

public FlinkCDCConfig() {}

public FlinkCDCConfig(
String type,
String hostname,
Expand All @@ -69,22 +76,23 @@ public FlinkCDCConfig(
Map<String, String> source,
Map<String, String> sink,
Map<String, String> jdbc) {
this.type = type;
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.checkpoint = checkpoint;
this.parallelism = parallelism;
this.database = database;
this.schema = schema;
this.table = table;
this.startupMode = startupMode;
this.split = split;
this.debezium = debezium;
this.source = source;
this.sink = sink;
this.jdbc = jdbc;
init(
type,
hostname,
port,
username,
password,
checkpoint,
parallelism,
database,
schema,
table,
startupMode,
split,
debezium,
source,
sink,
jdbc);
}

public void init(
Expand Down Expand Up @@ -122,6 +130,29 @@ public void init(
this.jdbc = jdbc;
}

private boolean isSkip(String key) {
switch (key) {
case SINK_DB:
case AUTO_CREATE:
case TABLE_PREFIX:
case TABLE_SUFFIX:
case TABLE_UPPER:
case TABLE_LOWER:
case COLUMN_REPLACE_LINE_BREAK:
case TIMEZONE:
return true;
default:
return false;
}
}

public String getSinkConfigurationString() {
return sink.entrySet().stream()
.filter(t -> !isSkip(t.getKey()))
.map(t -> String.format("'%s' = '%s'", t.getKey(), t.getValue()))
.collect(Collectors.joining(",\n"));
}

public String getType() {
return type;
}
Expand Down Expand Up @@ -222,42 +253,6 @@ public void setSchemaTableNameList(List<String> schemaTableNameList) {
this.schemaTableNameList = schemaTableNameList;
}

private boolean skip(String key) {
switch (key) {
case "sink.db":
case "auto.create":
case "table.prefix":
case "table.suffix":
case "table.upper":
case "table.lower":
case "column.replace.line-break":
case "timezone":
return true;
default:
return false;
}
}

public String getSinkConfigurationString() {
StringBuilder sb = new StringBuilder();
int index = 0;
for (Map.Entry<String, String> entry : sink.entrySet()) {
if (skip(entry.getKey())) {
continue;
}
if (index > 0) {
sb.append(",");
}
sb.append("'");
sb.append(entry.getKey());
sb.append("' = '");
sb.append(entry.getValue());
sb.append("'\n");
index++;
}
return sb.toString();
}

public void setSink(Map<String, String> sink) {
this.sink = sink;
}
Expand Down
Loading

0 comments on commit 3a15a61

Please sign in to comment.