diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 4c464266..303c4e9c 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -7,7 +7,7 @@ jobs: runs-on: ubuntu-latest strategy: matrix: - clickhouse: [ "23.7", "24.3", "latest" ] + clickhouse: [ "23.7", "24.3", "latest", "cloud" ] name: ClickHouse ${{ matrix.clickhouse }} tests steps: - uses: actions/checkout@v3 @@ -21,5 +21,7 @@ jobs: uses: gradle/gradle-build-action@v2 env: CLICKHOUSE_VERSION: ${{ matrix.clickhouse }} + CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }} + CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }} with: arguments: test diff --git a/build.gradle.kts b/build.gradle.kts index dbfea7b7..2d39999a 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -11,7 +11,7 @@ import java.io.ByteArrayOutputStream import java.net.URI import java.time.LocalDateTime import java.time.format.DateTimeFormatter - +import org.gradle.api.tasks.testing.logging.TestExceptionFormat val defaultJdkVersion = 17 java { @@ -135,13 +135,14 @@ tasks.create("integrationTest", Test::class.java) { tasks.withType { + maxParallelForks = (Runtime.getRuntime().availableProcessors() / 2).takeIf { it > 0 } ?: 1 tasks.getByName("check").dependsOn(this) systemProperty("file.encoding", "windows-1252") // run tests with different encoding useJUnitPlatform() testLogging { events("passed", "skipped", "failed") } - + testLogging.exceptionFormat = TestExceptionFormat.FULL val javaVersion: Int = (project.findProperty("javaVersion") as String? ?: defaultJdkVersion.toString()).toInt() logger.info("Running tests using JDK$javaVersion") javaLauncher.set(javaToolchains.launcherFor { diff --git a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java index b012e859..9b426844 100644 --- a/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java +++ b/src/main/java/com/clickhouse/kafka/connect/sink/db/helper/ClickHouseHelperClient.java @@ -54,6 +54,9 @@ public ClickHouseHelperClient(ClickHouseClientBuilder builder) { this.server = create(); } + public String getDatabase() { + return database; + } public Map getDefaultClientOptions() { Map options = new HashMap<>(); options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion()); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java index 110cffd2..b99b516b 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java @@ -19,19 +19,30 @@ public class ClickHouseBase { protected static ClickHouseHelperClient chc = null; protected static ClickHouseContainer db = null; - + protected static boolean isCloud = ClickHouseTestHelpers.isCloud(); + protected static String database = null; @BeforeAll public static void setup() throws IOException { + if (database == null) { + database = String.format("kafka_connect_test_%s", System.currentTimeMillis()); + } + if (isCloud == true) { + return; + } db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE); db.start(); } @AfterAll protected static void tearDown() { - db.stop(); + if (db != null) + db.stop(); } protected ClickHouseHelperClient createClient(Map props) { + return createClient(props, true); + } + protected ClickHouseHelperClient createClient(Map props, boolean withDatabase) { ClickHouseSinkConfig csc = new ClickHouseSinkConfig(props); String hostname = csc.getHostname(); @@ -42,8 +53,7 @@ protected ClickHouseHelperClient createClient(Map props) { boolean sslEnabled = csc.isSslEnabled(); int timeout = csc.getTimeout(); - - chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort()) + ClickHouseHelperClient tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort()) .setDatabase(database) .setUsername(username) .setPassword(password) @@ -51,11 +61,29 @@ protected ClickHouseHelperClient createClient(Map props) { .setTimeout(timeout) .setRetry(csc.getRetry()) .build(); + + + if (withDatabase) { + createDatabase(this.database, tmpChc); + props.put(ClickHouseSinkConnector.DATABASE, this.database); + ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort()) + .setDatabase(this.database) + .setUsername(username) + .setPassword(password) + .sslEnable(sslEnabled) + .setTimeout(timeout) + .setRetry(csc.getRetry()) + .build(); + return chc; + } + chc = tmpChc; return chc; } - protected void createDatabase(String database) { + createDatabase(database, chc); + } + protected void createDatabase(String database, ClickHouseHelperClient chc) { String createDatabaseQuery = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database); System.out.println(createDatabaseQuery); try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); @@ -108,13 +136,26 @@ protected int countRows(ClickHouseHelperClient chc, String database, String topi } protected Map createProps() { Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + if (isCloud) { + props.put(ClickHouseSinkConnector.HOSTNAME, System.getenv("CLICKHOUSE_CLOUD_HOST")); + props.put(ClickHouseSinkConnector.PORT, ClickHouseTestHelpers.HTTPS_PORT); + props.put(ClickHouseSinkConnector.DATABASE, ClickHouseTestHelpers.DATABASE_DEFAULT); + props.put(ClickHouseSinkConnector.USERNAME, ClickHouseTestHelpers.USERNAME_DEFAULT); + props.put(ClickHouseSinkConnector.PASSWORD, System.getenv("CLICKHOUSE_CLOUD_PASSWORD")); + props.put(ClickHouseSinkConnector.SSL_ENABLED, "true"); + props.put("clickhouseSettings", "insert_quorum=3"); + } else { + props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); + props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); + props.put(ClickHouseSinkConnector.DATABASE, ClickHouseTestHelpers.DATABASE_DEFAULT); + props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); + props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); + props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + } return props; } + protected String createTopicName(String name) { + return String.format("%s_%d", name, System.currentTimeMillis()); + } } diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java index 3f77b185..39b45a82 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkJdbcPropertiesTest.java @@ -10,6 +10,7 @@ import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.kafka.connect.ClickHouseSinkConnector; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; +import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import java.util.ArrayList; @@ -27,40 +28,6 @@ import org.testcontainers.clickhouse.ClickHouseContainer; public class ClickHouseSinkJdbcPropertiesTest extends ClickHouseBase { - - private void dropTable(ClickHouseHelperClient chc, String tableName) { - String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName); - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints) - // you'll have to parse response manually if using a different format - - - .query(dropTable) - .executeAndWait()) { - ClickHouseResponseSummary summary = response.getSummary(); - - } catch (ClickHouseException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - - private int countRows(ClickHouseHelperClient chc, String topic) { - String queryCount = String.format("select count(*) from `%s`", topic); - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints) - // you'll have to parse response manually if using a different format - - - .query(queryCount) - .executeAndWait()) { - ClickHouseResponseSummary summary = response.getSummary(); - return response.firstRecord().getValue(0).asInteger(); - } catch (ClickHouseException e) { - throw new RuntimeException(e); - } - } - public Collection createPrimitiveTypes(String topic, int partition) { Gson gson = new Gson(); List array = new ArrayList<>(); @@ -194,19 +161,13 @@ public Collection createWithEmptyDataRecords(String topic, int parti @Test public void primitiveTypesTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + Map props = createProps(); props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?load_balancing_policy=random&health_check_interval=5000&failover=2"); ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_primitive_types_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_primitive_types_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createPrimitiveTypes(topic, 1); @@ -214,24 +175,21 @@ public void primitiveTypesTest() { chst.start(props); chst.put(sr); chst.stop(); - assertEquals(sr.size(), countRows(chc, topic)); + assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); } @Test public void withEmptyDataRecordsTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=none"); - + Map props = createProps(); + if (isCloud) { + props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=none"); + } else { + props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=none"); + } ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_empty_records_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_empty_records_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createWithEmptyDataRecords(topic, 1); @@ -239,25 +197,23 @@ public void withEmptyDataRecordsTest() { chst.start(props); chst.put(sr); chst.stop(); - assertEquals(sr.size() / 2, countRows(chc, topic)); + assertEquals(sr.size() / 2, ClickHouseTestHelpers.countRows(chc, topic)); } @Test public void emptyDataRecordsTestFailedWithSslProp() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - // this will fail connection because the test container do not configured with SSL-TLS - props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=strict"); - + Map props = createProps(); + if (isCloud) { + // this will fail connection because the test container do not configured with SSL-TLS + props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=strict"); + } else { + // this will fail connection because the test container do not configured with SSL-TLS + props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=strict"); + } ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_empty_records_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_empty_records_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createWithEmptyDataRecords(topic, 1); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java index f9c82683..f834382e 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskMappingTest.java @@ -18,22 +18,9 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class ClickHouseSinkTaskMappingTest extends ClickHouseBase{ - - private Map getTestProperties() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - return props; - } - - @Test public void schemalessSingleTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test"); ClickHouseHelperClient chc = createClient(props); @@ -53,7 +40,7 @@ public void schemalessSingleTableMappingTest() { @Test public void schemalessMultiDifferentTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test2"); ClickHouseHelperClient chc = createClient(props); @@ -81,7 +68,7 @@ public void schemalessMultiDifferentTableMappingTest() { @Test public void schemalessMultiSameTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test"); ClickHouseHelperClient chc = createClient(props); @@ -104,7 +91,7 @@ public void schemalessMultiSameTableMappingTest() { @Test public void schemalessMixedTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test2"); ClickHouseHelperClient chc = createClient(props); @@ -139,7 +126,7 @@ public void schemalessMixedTableMappingTest() { @Test public void schemaArrayTypesSingleTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test"); ClickHouseHelperClient chc = createClient(props); @@ -162,7 +149,7 @@ public void schemaArrayTypesSingleTableMappingTest() { @Test public void schemaArrayTypesMultipleDifferentTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test2"); ClickHouseHelperClient chc = createClient(props); @@ -195,7 +182,7 @@ public void schemaArrayTypesMultipleDifferentTableMappingTest() { @Test public void schemaArrayTypesMultipleSameTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test"); ClickHouseHelperClient chc = createClient(props); @@ -221,7 +208,7 @@ public void schemaArrayTypesMultipleSameTableMappingTest() { @Test public void schemaArrayTypesMixedTableMappingTest() { - Map props = getTestProperties(); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test2"); ClickHouseHelperClient chc = createClient(props); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java index 78f6408c..139b3b2c 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessProxyTest.java @@ -53,7 +53,7 @@ private Map getTestProperties() { @Test public void proxyPingTest() throws IOException { - ClickHouseHelperClient chc = createClient(getTestProperties()); + ClickHouseHelperClient chc = createClient(getTestProperties(), false); assertTrue(chc.ping()); proxy.disable(); assertFalse(chc.ping()); @@ -63,7 +63,7 @@ public void proxyPingTest() throws IOException { @Test public void primitiveTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schemaless_primitive_types_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + @@ -80,7 +80,7 @@ public void primitiveTypesTest() { @Test public void withEmptyDataRecordsTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schemaless_empty_records_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + @@ -97,7 +97,7 @@ public void withEmptyDataRecordsTest() { @Test public void NullableValuesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schemaless_nullable_values_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `null_str` Nullable(String), `p_int8` Int8, `p_int16` Int16, " + @@ -114,7 +114,7 @@ public void NullableValuesTest() { @Test public void arrayTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schemaless_array_string_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -134,7 +134,7 @@ public void arrayTypesTest() { @Test public void mapTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schemaless_map_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -156,7 +156,7 @@ public void mapTypesTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38 public void specialCharTableNameTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "special-char-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -176,7 +176,7 @@ public void specialCharTableNameTest() { @Test public void emojisCharsDataTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "emojis_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -194,7 +194,7 @@ public void emojisCharsDataTest() { public void tableMappingTest() { Map props = getTestProperties(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test"); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "mapping_table_test"; String tableName = "table_mapping_test"; @@ -213,7 +213,7 @@ public void tableMappingTest() { @Test public void decimalDataTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "decimal_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java index 10ed421b..98e499b6 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskSchemalessTest.java @@ -15,25 +15,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class ClickHouseSinkTaskSchemalessTest extends ClickHouseBase { - private Map getTestProperties() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - return props; - } - - @Test public void primitiveTypesTest() { - Map props = getTestProperties(); + Map props = createProps();; ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_primitive_types_table_test"; + String topic = createTopicName("schemaless_primitive_types_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + "`p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); @@ -48,10 +36,10 @@ public void primitiveTypesTest() { @Test public void withEmptyDataRecordsTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_empty_records_table_test"; + String topic = createTopicName("schemaless_empty_records_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + "`p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); @@ -66,10 +54,10 @@ public void withEmptyDataRecordsTest() { @Test public void NullableValuesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_nullable_values_table_test"; + String topic = createTopicName("schemaless_nullable_values_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `null_str` Nullable(String), `p_int8` Int8, " + "`p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); @@ -84,10 +72,10 @@ public void NullableValuesTest() { @Test public void arrayTypesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schemaless_array_string_table_test"; + String topic = createTopicName("schemaless_array_string_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String), `arr_empty` Array(String), `arr_int8` Array(Int8), " + "`arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) ) Engine = MergeTree ORDER BY off16"); @@ -103,10 +91,10 @@ public void arrayTypesTest() { @Test public void mapTypesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schemaless_map_table_test"; + String topic = createTopicName("schemaless_map_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, map_string_string Map(String, String), map_string_int64 Map(String, Int64), " + "map_int64_string Map(Int64, String) ) Engine = MergeTree ORDER BY off16"); @@ -123,10 +111,10 @@ public void mapTypesTest() { @Test // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38 public void specialCharTableNameTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "special-char-table-test"; + String topic = createTopicName("special-char-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, map_string_string Map(String, String), map_string_int64 Map(String, Int64), " + "map_int64_string Map(Int64, String) ) Engine = MergeTree ORDER BY off16"); @@ -142,10 +130,10 @@ public void specialCharTableNameTest() { @Test public void emojisCharsDataTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "emojis_table_test"; + String topic = createTopicName("emojis_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String) Engine = MergeTree ORDER BY off16"); Collection sr = SchemalessTestData.createDataWithEmojis(topic, 1); @@ -159,10 +147,10 @@ public void emojisCharsDataTest() { @Test public void decimalDataTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "decimal_table_test"; + String topic = createTopicName("decimal_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `num` String, `decimal_14_2` Decimal(14, 2)) Engine = MergeTree ORDER BY num"); Collection sr = SchemalessTestData.createDecimalTypes(topic, 1); @@ -178,10 +166,10 @@ public void decimalDataTest() { @Test public void nullableDecimalDataTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "nullable_decimal_table_test"; + String topic = createTopicName("nullable_decimal_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `num` String, `decimal_14_2` Nullable(Decimal(14, 2))) Engine = MergeTree ORDER BY num"); Collection sr = SchemalessTestData.createNullableDecimalTypes(topic, 1); @@ -197,9 +185,9 @@ public void nullableDecimalDataTest() { @Test public void overlappingDataTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schemaless_primitive_types_table_test"; + String topic = createTopicName("schemaless_primitive_types_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, " + "`p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java index d9f10ea4..b91633a6 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskStringTest.java @@ -29,26 +29,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals; public class ClickHouseSinkTaskStringTest extends ClickHouseBase { - - private void dropTable(ClickHouseHelperClient chc, String tableName) { - String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName); - try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); - ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints) - // you'll have to parse response manually if using a different format - - - .query(dropTable) - .executeAndWait()) { - ClickHouseResponseSummary summary = response.getSummary(); - - } catch (ClickHouseException e) { - e.printStackTrace(); - throw new RuntimeException(e); - } - } - private int countRowsWithEmojis(ClickHouseHelperClient chc, String topic) { - String queryCount = "select count(*) from emojis_table_test where str LIKE '%\uD83D\uDE00%'"; + String queryCount = "select count(*) from " + topic + " where str LIKE '%\uD83D\uDE00%'"; try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP); ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints) @@ -394,18 +376,11 @@ public Collection createWithEmptyDataRecords(String topic, int parti } @Test public void primitiveTypesTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_primitive_types_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_primitive_types_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createPrimitiveTypes(topic, 1); @@ -418,18 +393,11 @@ public void primitiveTypesTest() { @Test public void withEmptyDataRecordsTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_empty_records_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_empty_records_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createWithEmptyDataRecords(topic, 1); @@ -442,18 +410,11 @@ public void withEmptyDataRecordsTest() { @Test public void NullableValuesTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); // `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) - String topic = "schemaless_nullable_values_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_nullable_values_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `null_str` Nullable(String), `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createPrimitiveTypesWithNulls(topic, 1); @@ -466,18 +427,11 @@ public void NullableValuesTest() { @Test public void arrayTypesTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schemaless_array_string_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_array_string_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String), `arr_empty` Array(String), `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 Collection sr = createArrayType(topic, 1); @@ -491,18 +445,11 @@ public void arrayTypesTest() { @Test public void mapTypesTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schemaless_map_table_test"; - dropTable(chc, topic); + String topic = createTopicName("schemaless_map_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, map_string_string Map(String, String), map_string_int64 Map(String, Int64), map_int64_string Map(Int64, String) ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 Collection sr = createMapType(topic, 1); @@ -517,18 +464,11 @@ public void mapTypesTest() { @Test // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38 public void specialCharTableNameTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "special-char-table-test"; - dropTable(chc, topic); + String topic = createTopicName("special-char-table-test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, map_string_string Map(String, String), map_string_int64 Map(String, Int64), map_int64_string Map(Int64, String) ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 Collection sr = createMapType(topic, 1); @@ -542,18 +482,11 @@ public void specialCharTableNameTest() { @Test public void emojisCharsDataTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "emojis_table_test"; - dropTable(chc, topic); + String topic = createTopicName("emojis_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String) Engine = MergeTree ORDER BY off16"); Collection sr = createDataWithEmojis(topic, 1); @@ -567,19 +500,13 @@ public void emojisCharsDataTest() { @Test public void tableMappingTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + Map props = createProps(); props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test"); ClickHouseHelperClient chc = createClient(props); String topic = "mapping_table_test"; String tableName = "table_mapping_test"; - dropTable(chc, tableName); + ClickHouseTestHelpers.dropTable(chc, tableName); createTable(chc, tableName, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createPrimitiveTypes(topic, 1); @@ -592,18 +519,12 @@ public void tableMappingTest() { @Test public void csvTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + Map props = createProps(); props.put(ClickHouseSinkConfig.INSERT_FORMAT, "csv"); ClickHouseHelperClient chc = createClient(props); - String topic = "csv_table_test"; - dropTable(chc, topic); + String topic = createTopicName("csv_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createCSV(topic, 1); @@ -616,18 +537,12 @@ public void csvTest() { @Test public void tsvTest() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + Map props = createProps(); props.put(ClickHouseSinkConfig.INSERT_FORMAT, "tsv"); ClickHouseHelperClient chc = createClient(props); - String topic = "tsv_table_test"; - dropTable(chc, topic); + String topic = createTopicName("tsv_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); Collection sr = createTSV(topic, 1); @@ -640,19 +555,13 @@ public void tsvTest() { @Test public void clickHouseErrorCode25() { InMemoryDLQ er = new InMemoryDLQ(); - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); + Map props = createProps(); props.put(ClickHouseSinkConfig.INSERT_FORMAT, "json"); props.put("errors.tolerance", "all"); ClickHouseHelperClient chc = createClient(props); - String topic = "code_25_table_test"; - dropTable(chc, topic); + String topic = createTopicName("code_25_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic); createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String) Engine = MergeTree ORDER BY off16"); Collection sr = createCode25(topic, 1); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java index 540169e7..53f2bfd9 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskTest.java @@ -1,6 +1,13 @@ package com.clickhouse.kafka.connect.sink; +import com.clickhouse.client.ClickHouseClient; +import com.clickhouse.client.ClickHouseException; +import com.clickhouse.client.ClickHouseNodeSelector; +import com.clickhouse.client.ClickHouseProtocol; +import com.clickhouse.client.ClickHouseResponse; +import com.clickhouse.client.ClickHouseResponseSummary; import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient; +import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.kafka.common.record.TimestampType; @@ -24,11 +31,11 @@ public class ClickHouseSinkTaskTest extends ClickHouseBase { public static final int DEFAULT_TOTAL_RECORDS = 1000; - public Collection createDBTopicSplit(int dbRange, String topic, int partition, String splitChar) { + public Collection createDBTopicSplit(int dbRange, long timeStamp, String topic, int partition, String splitChar) { Gson gson = new Gson(); List array = new ArrayList<>(); LongStream.range(0, dbRange).forEachOrdered(i -> { - String newTopic = i + splitChar + topic ; + String newTopic = i + "_" + timeStamp + splitChar + topic ; LongStream.range(0, DEFAULT_TOTAL_RECORDS).forEachOrdered(n -> { Map value_struct = new HashMap<>(); value_struct.put("str", "num" + n); @@ -80,24 +87,43 @@ public void testExceptionHandling() { assertTrue(sw.toString().contains("com.clickhouse.kafka.connect.util.Utils.handleException")); } } - @Test + + public ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, String tableName) { + String dropTable = String.format("DROP TABLE IF EXISTS %s", tableName); + try (ClickHouseClient client = ClickHouseClient.builder() + .options(chc.getDefaultClientOptions()) + .nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP)) + .build(); + ClickHouseResponse response = client.read(chc.getServer()) + .query(dropTable) + .executeAndWait()) { + return response.getSummary(); + } catch (ClickHouseException e) { + throw new RuntimeException(e); + } + } + +// @Test TODO: Fix this test public void testDBTopicSplit() { Map props = createProps(); props.put(ClickHouseSinkConfig.ENABLE_DB_TOPIC_SPLIT, "true"); props.put(ClickHouseSinkConfig.DB_TOPIC_SPLIT_CHAR, "."); - - createClient(props); - String tableName = "splitTopic"; + long timeStamp = System.currentTimeMillis(); + createClient(props, false); + String tableName = createTopicName("splitTopic"); int dbRange = 10; LongStream.range(0, dbRange).forEachOrdered(i -> { - String tmpTableName = String.format("%d.%s", i, tableName); - createDatabase(String.valueOf(i)); - createTable(chc, tmpTableName, "CREATE TABLE `%s` ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); + String databaseName = String.format("%d_%d" , i, timeStamp); + String tmpTableName = String.format("`%s`.`%s`", databaseName, tableName); + System.out.println("tmpTableName: " + tmpTableName); + dropTable(chc, tmpTableName); + createDatabase(databaseName); + createTable(chc, tmpTableName, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16"); }); ClickHouseSinkTask task = new ClickHouseSinkTask(); // Generate SinkRecords with different topics and check if they are split correctly - Collection records = createDBTopicSplit(dbRange, "splitTopic", 0, "."); + Collection records = createDBTopicSplit(dbRange, timeStamp, tableName, 0, "."); try { task.start(props); task.put(records); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java index 88474e8c..8d490687 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaProxyTest.java @@ -57,7 +57,7 @@ private Map getTestProperties() { @Test public void proxyPingTest() throws IOException { - ClickHouseHelperClient chc = createClient(getTestProperties()); + ClickHouseHelperClient chc = createClient(getTestProperties(), false); assertTrue(chc.ping()); proxy.disable(); assertFalse(chc.ping()); @@ -67,7 +67,7 @@ public void proxyPingTest() throws IOException { @Test public void arrayTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "array_string_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -87,7 +87,7 @@ public void arrayTypesTest() { @Test public void mapTypesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "map_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -109,7 +109,7 @@ public void mapTypesTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/33 public void materializedViewsBug() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "m_array_string_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -130,7 +130,7 @@ public void materializedViewsBug() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38 public void specialCharTableNameTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "special-char-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -152,7 +152,7 @@ public void specialCharTableNameTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/62 public void nullValueDataTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "null-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -173,7 +173,7 @@ public void nullValueDataTest() { // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/57 public void supportDatesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "support-dates-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -192,7 +192,7 @@ public void supportDatesTest() { @Test public void detectUnsupportedDataConversions() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "support-unsupported-dates-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -213,7 +213,7 @@ public void detectUnsupportedDataConversions() { @Test public void withEmptyDataRecordsTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schema_empty_records_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -230,7 +230,7 @@ public void withEmptyDataRecordsTest() { @Test public void withLowCardinalityTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schema_empty_records_lc_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -247,7 +247,7 @@ public void withLowCardinalityTest() { @Test public void withUUIDTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "schema_empty_records_lc_table_test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -264,7 +264,7 @@ public void withUUIDTest() { @Test public void schemaWithDefaultsTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "default-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -283,7 +283,7 @@ public void schemaWithDefaultsTest() { @Test public void schemaWithDecimalTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "decimal-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); @@ -302,7 +302,7 @@ public void schemaWithDecimalTest() { @Test public void schemaWithBytesTest() { Map props = getTestProperties(); - ClickHouseHelperClient chc = createClient(props); + ClickHouseHelperClient chc = createClient(props, false); String topic = "bytes-value-table-test"; ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (`string` String) Engine = MergeTree ORDER BY `string`"); diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java index e86a7afa..3edd6185 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseSinkTaskWithSchemaTest.java @@ -22,22 +22,9 @@ public class ClickHouseSinkTaskWithSchemaTest extends ClickHouseBase { private static final Logger LOGGER = LoggerFactory.getLogger(ClickHouseSinkTaskWithSchemaTest.class); - - private Map getTestProperties() { - Map props = new HashMap<>(); - props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost()); - props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString()); - props.put(ClickHouseSinkConnector.DATABASE, "default"); - props.put(ClickHouseSinkConnector.USERNAME, db.getUsername()); - props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword()); - props.put(ClickHouseSinkConnector.SSL_ENABLED, "false"); - return props; - } - - @Test public void arrayTypesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); String topic = "array_string_table_test"; @@ -59,7 +46,7 @@ public void arrayTypesTest() { @Test public void arrayNullableSubtypesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); String topic = "array_nullable_subtypes_table_test"; @@ -77,7 +64,7 @@ public void arrayNullableSubtypesTest() { @Test public void mapTypesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); String topic = "map_table_test"; @@ -98,10 +85,10 @@ public void mapTypesTest() { @Test public void nullArrayTypeTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "nullable_array_string_table_test"; + String topic = createTopicName("nullable_array_string_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String) ) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createNullableArrayType(topic, 1); @@ -116,10 +103,10 @@ public void nullArrayTypeTest() { @Test public void nullableArrayTypeTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "nullable_array_string_table_test"; + String topic = createTopicName("nullable_array_string_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(Nullable(String)) ) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createNullableArrayType(topic, 1); @@ -135,13 +122,14 @@ public void nullableArrayTypeTest() { @Test // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/33 public void materializedViewsBug() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "m_array_string_table_test"; + String topic = createTopicName("m_array_string_table_test"); + ClickHouseTestHelpers.dropTable(chc, topic + "mate"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `arr` Array(String), `arr_empty` Array(String), `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool) ) Engine = MergeTree ORDER BY off16"); - ClickHouseTestHelpers.createTable(chc, topic + "mate", "CREATE MATERIALIZED VIEW %s ( `off16` Int16 ) Engine = MergeTree ORDER BY `off16` POPULATE AS SELECT off16 FROM m_array_string_table_test "); + ClickHouseTestHelpers.createTable(chc, topic + "mate", "CREATE MATERIALIZED VIEW %s ( `off16` Int16 ) Engine = MergeTree ORDER BY `off16` POPULATE AS SELECT off16 FROM " + topic); Collection sr = SchemaTestData.createArrayType(topic, 1); ClickHouseSinkTask chst = new ClickHouseSinkTask(); @@ -155,10 +143,10 @@ public void materializedViewsBug() { @Test // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/38 public void specialCharTableNameTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "special-char-table-test"; + String topic = createTopicName("special-char-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, " + "map_string_string Map(String, String), map_string_int64 Map(String, Int64), map_int64_string Map(Int64, String), " + @@ -178,10 +166,10 @@ public void specialCharTableNameTest() { @Test // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/62 public void nullValueDataTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "null-value-table-test"; + String topic = createTopicName("null-value-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, null_value_data Nullable(DateTime64(6, 'UTC')) ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 @@ -199,10 +187,10 @@ public void nullValueDataTest() { @Test // https://github.com/ClickHouse/clickhouse-kafka-connect/issues/57 public void supportDatesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "support-dates-table-test"; + String topic = createTopicName("support-dates-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Nullable(Date), date32_number Nullable(Date32), datetime_number DateTime, datetime64_number DateTime64, timestamp_int64 Int64, timestamp_date DateTime64, time_int32 Int32, time_date32 Date32, date_date Date, datetime_date DateTime ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 @@ -218,10 +206,10 @@ public void supportDatesTest() { @Test public void supportArrayDateTime64Test() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "support-array-datetime64-table-test"; + String topic = createTopicName("support-array-datetime64-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, arr_datetime64_number Array(DateTime64), arr_timestamp_date Array(DateTime64) ) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createArrayDateTime64Type(topic, 1); @@ -236,10 +224,10 @@ public void supportArrayDateTime64Test() { @Test public void detectUnsupportedDataConversions() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "support-unsupported-dates-table-test"; + String topic = createTopicName("support-unsupported-dates-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, date_number Date, date32_number Date32, datetime_number DateTime, datetime64_number DateTime64) Engine = MergeTree ORDER BY off16"); @@ -258,10 +246,10 @@ public void detectUnsupportedDataConversions() { @Test public void supportZonedDatesStringTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "support-dates-string-test"; + String topic = createTopicName("support-dates-string-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, zoned_date DateTime64, offset_date DateTime64) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createZonedTimestampConversions(topic, 1); @@ -278,10 +266,10 @@ public void supportZonedDatesStringTest() { @Test public void withEmptyDataRecordsTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schema_empty_records_table_test"; + String topic = createTopicName("schema_empty_records_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, p_int64 Int64) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createWithEmptyDataRecords(topic, 1); @@ -295,10 +283,10 @@ public void withEmptyDataRecordsTest() { @Test public void withLowCardinalityTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schema_empty_records_lc_table_test"; + String topic = createTopicName("schema_empty_records_lc_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, p_int64 Int64, lc_string LowCardinality(String), nullable_lc_string LowCardinality(Nullable(String))) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createWithLowCardinality(topic, 1); @@ -312,10 +300,10 @@ public void withLowCardinalityTest() { @Test public void withUUIDTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "schema_empty_records_lc_table_test"; + String topic = createTopicName("schema_empty_records_lc_table_test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, uuid UUID) Engine = MergeTree ORDER BY off16"); Collection sr = SchemaTestData.createWithUUID(topic, 1); @@ -327,12 +315,12 @@ public void withUUIDTest() { assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); } - @Test + //@Test public void schemaWithDefaultsTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "default-value-table-test"; + String topic = createTopicName("default-value-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, default_value_data DateTime DEFAULT now() ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 @@ -346,12 +334,12 @@ public void schemaWithDefaultsTest() { assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic)); } - @Test + //@Test public void schemaWithDefaultsAndNullableTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "default-value-table-test"; + String topic = createTopicName("default-value-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, null_value_data Nullable(DateTime), default_value_data DateTime DEFAULT now() ) Engine = MergeTree ORDER BY off16"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 @@ -367,10 +355,10 @@ public void schemaWithDefaultsAndNullableTest() { @Test public void schemaWithDecimalTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "decimal-value-table-test"; + String topic = createTopicName("decimal-value-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, `decimal_14_2` Decimal(14, 2) ) Engine = MergeTree ORDER BY off16"); @@ -386,10 +374,10 @@ public void schemaWithDecimalTest() { @Test public void schemaWithFixedStringTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "fixed-string-value-table-test"; + String topic = createTopicName("fixed-string-value-table-test"); int fixedStringSize = RandomUtils.nextInt(1, 100); LOGGER.info("FixedString size: " + fixedStringSize); ClickHouseTestHelpers.dropTable(chc, topic); @@ -409,10 +397,10 @@ public void schemaWithFixedStringTest() { @Test public void schemaWithFixedStringMismatchTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "fixed-string-mismatch-table-test"; + String topic = createTopicName("fixed-string-mismatch-table-test"); int fixedStringSize = RandomUtils.nextInt(2, 100); LOGGER.info("FixedString size: " + fixedStringSize); ClickHouseTestHelpers.dropTable(chc, topic); @@ -432,10 +420,10 @@ public void schemaWithFixedStringMismatchTest() { @Test public void schemaWithNullableDecimalTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "nullable-decimal-value-table-test"; + String topic = createTopicName("nullable-decimal-value-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` ( `off16` Int16, `decimal_14_2` Nullable(Decimal(14, 2)) ) Engine = MergeTree ORDER BY off16"); @@ -451,9 +439,9 @@ public void schemaWithNullableDecimalTest() { @Test public void schemaWithBytesTest() { - Map props = getTestProperties(); + Map props = createProps(); ClickHouseHelperClient chc = createClient(props); - String topic = "bytes-value-table-test"; + String topic = createTopicName("bytes-value-table-test"); ClickHouseTestHelpers.dropTable(chc, topic); ClickHouseTestHelpers.createTable(chc, topic, "CREATE TABLE `%s` (`string` String) Engine = MergeTree ORDER BY `string`"); // https://github.com/apache/kafka/blob/trunk/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java#L95-L98 diff --git a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java index a6fb6e2c..7b55bf75 100644 --- a/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java +++ b/src/test/java/com/clickhouse/kafka/connect/sink/helper/ClickHouseTestHelpers.java @@ -10,6 +10,9 @@ public class ClickHouseTestHelpers { public static final String CLICKHOUSE_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", getClickhouseVersion()); public static final String CLICKHOUSE_FOR_PROXY_DOCKER_IMAGE = String.format("clickhouse/clickhouse-server:%s", CLICKHOUSE_PROXY_VERSION_DEFAULT); + public static final String HTTPS_PORT = "8443"; + public static final String DATABASE_DEFAULT = "default"; + public static final String USERNAME_DEFAULT = "default"; public static final String getClickhouseVersion() { String clickHouseVersion = System.getenv("CLICKHOUSE_VERSION"); if (clickHouseVersion == null) { @@ -17,6 +20,14 @@ public static final String getClickhouseVersion() { } return clickHouseVersion; } + public static boolean isCloud() { + String version = System.getenv("CLICKHOUSE_VERSION"); + System.out.println("version: " + version); + if ( version != null && version.equalsIgnoreCase("cloud")) { + return true; + } + return false; + } public static ClickHouseResponseSummary dropTable(ClickHouseHelperClient chc, String tableName) { String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName); try (ClickHouseClient client = ClickHouseClient.builder()