Skip to content

Commit

Permalink
Merge pull request #362 from ClickHouse/add-cloud-testing
Browse files Browse the repository at this point in the history
Add cloud testing
  • Loading branch information
Paultagoras authored Apr 8, 2024
2 parents 24ece8d + d51cea7 commit 05368b8
Show file tree
Hide file tree
Showing 13 changed files with 262 additions and 350 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
clickhouse: [ "23.7", "24.3", "latest" ]
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
name: ClickHouse ${{ matrix.clickhouse }} tests
steps:
- uses: actions/checkout@v3
Expand All @@ -21,5 +21,7 @@ jobs:
uses: gradle/gradle-build-action@v2
env:
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
with:
arguments: test
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.io.ByteArrayOutputStream
import java.net.URI
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.gradle.api.tasks.testing.logging.TestExceptionFormat

val defaultJdkVersion = 17
java {
Expand Down Expand Up @@ -135,13 +135,14 @@ tasks.create("integrationTest", Test::class.java) {


tasks.withType<Test> {
maxParallelForks = (Runtime.getRuntime().availableProcessors() / 2).takeIf { it > 0 } ?: 1
tasks.getByName("check").dependsOn(this)
systemProperty("file.encoding", "windows-1252") // run tests with different encoding
useJUnitPlatform()
testLogging {
events("passed", "skipped", "failed")
}

testLogging.exceptionFormat = TestExceptionFormat.FULL
val javaVersion: Int = (project.findProperty("javaVersion") as String? ?: defaultJdkVersion.toString()).toInt()
logger.info("Running tests using JDK$javaVersion")
javaLauncher.set(javaToolchains.launcherFor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public ClickHouseHelperClient(ClickHouseClientBuilder builder) {
this.server = create();
}

public String getDatabase() {
return database;
}
public Map<ClickHouseOption, Serializable> getDefaultClientOptions() {
Map<ClickHouseOption, Serializable> options = new HashMap<>();
options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion());
Expand Down
63 changes: 52 additions & 11 deletions src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,30 @@
public class ClickHouseBase {
protected static ClickHouseHelperClient chc = null;
protected static ClickHouseContainer db = null;

protected static boolean isCloud = ClickHouseTestHelpers.isCloud();
protected static String database = null;
@BeforeAll
public static void setup() throws IOException {
if (database == null) {
database = String.format("kafka_connect_test_%s", System.currentTimeMillis());
}
if (isCloud == true) {
return;
}
db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE);
db.start();
}

@AfterAll
protected static void tearDown() {
db.stop();
if (db != null)
db.stop();
}

protected ClickHouseHelperClient createClient(Map<String,String> props) {
return createClient(props, true);
}
protected ClickHouseHelperClient createClient(Map<String,String> props, boolean withDatabase) {
ClickHouseSinkConfig csc = new ClickHouseSinkConfig(props);

String hostname = csc.getHostname();
Expand All @@ -42,20 +53,37 @@ protected ClickHouseHelperClient createClient(Map<String,String> props) {
boolean sslEnabled = csc.isSslEnabled();
int timeout = csc.getTimeout();


chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
ClickHouseHelperClient tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(database)
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();


if (withDatabase) {
createDatabase(this.database, tmpChc);
props.put(ClickHouseSinkConnector.DATABASE, this.database);
ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(this.database)
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();
return chc;
}
chc = tmpChc;
return chc;
}


protected void createDatabase(String database) {
createDatabase(database, chc);
}
protected void createDatabase(String database, ClickHouseHelperClient chc) {
String createDatabaseQuery = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database);
System.out.println(createDatabaseQuery);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
Expand Down Expand Up @@ -108,13 +136,26 @@ protected int countRows(ClickHouseHelperClient chc, String database, String topi
}
protected Map<String,String> createProps() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
if (isCloud) {
props.put(ClickHouseSinkConnector.HOSTNAME, System.getenv("CLICKHOUSE_CLOUD_HOST"));
props.put(ClickHouseSinkConnector.PORT, ClickHouseTestHelpers.HTTPS_PORT);
props.put(ClickHouseSinkConnector.DATABASE, ClickHouseTestHelpers.DATABASE_DEFAULT);
props.put(ClickHouseSinkConnector.USERNAME, ClickHouseTestHelpers.USERNAME_DEFAULT);
props.put(ClickHouseSinkConnector.PASSWORD, System.getenv("CLICKHOUSE_CLOUD_PASSWORD"));
props.put(ClickHouseSinkConnector.SSL_ENABLED, "true");
props.put("clickhouseSettings", "insert_quorum=3");
} else {
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, ClickHouseTestHelpers.DATABASE_DEFAULT);
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
}
return props;
}

protected String createTopicName(String name) {
return String.format("%s_%d", name, System.currentTimeMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.clickhouse.client.ClickHouseResponseSummary;
import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
Expand All @@ -27,40 +28,6 @@
import org.testcontainers.clickhouse.ClickHouseContainer;

public class ClickHouseSinkJdbcPropertiesTest extends ClickHouseBase {

private void dropTable(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format


.query(dropTable)
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();

} catch (ClickHouseException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

private int countRows(ClickHouseHelperClient chc, String topic) {
String queryCount = String.format("select count(*) from `%s`", topic);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format


.query(queryCount)
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
return response.firstRecord().getValue(0).asInteger();
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public Collection<SinkRecord> createPrimitiveTypes(String topic, int partition) {
Gson gson = new Gson();
List<SinkRecord> array = new ArrayList<>();
Expand Down Expand Up @@ -194,70 +161,59 @@ public Collection<SinkRecord> createWithEmptyDataRecords(String topic, int parti

@Test
public void primitiveTypesTest() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?load_balancing_policy=random&health_check_interval=5000&failover=2");

ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = "schemaless_primitive_types_table_test";
dropTable(chc, topic);
String topic = createTopicName("schemaless_primitive_types_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = createPrimitiveTypes(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();
assertEquals(sr.size(), countRows(chc, topic));
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
public void withEmptyDataRecordsTest() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=none");

Map<String, String> props = createProps();
if (isCloud) {
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=none");
} else {
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=none");
}
ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = "schemaless_empty_records_table_test";
dropTable(chc, topic);
String topic = createTopicName("schemaless_empty_records_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = createWithEmptyDataRecords(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();
assertEquals(sr.size() / 2, countRows(chc, topic));
assertEquals(sr.size() / 2, ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
public void emptyDataRecordsTestFailedWithSslProp() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
// this will fail connection because the test container do not configured with SSL-TLS
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=strict");

Map<String, String> props = createProps();
if (isCloud) {
// this will fail connection because the test container do not configured with SSL-TLS
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=strict");
} else {
// this will fail connection because the test container do not configured with SSL-TLS
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=strict");
}
ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = "schemaless_empty_records_table_test";
dropTable(chc, topic);
String topic = createTopicName("schemaless_empty_records_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = createWithEmptyDataRecords(topic, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

public class ClickHouseSinkTaskMappingTest extends ClickHouseBase{

private Map<String, String> getTestProperties() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
return props;
}


@Test
public void schemalessSingleTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -53,7 +40,7 @@ public void schemalessSingleTableMappingTest() {

@Test
public void schemalessMultiDifferentTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down Expand Up @@ -81,7 +68,7 @@ public void schemalessMultiDifferentTableMappingTest() {

@Test
public void schemalessMultiSameTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -104,7 +91,7 @@ public void schemalessMultiSameTableMappingTest() {

@Test
public void schemalessMixedTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down Expand Up @@ -139,7 +126,7 @@ public void schemalessMixedTableMappingTest() {

@Test
public void schemaArrayTypesSingleTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -162,7 +149,7 @@ public void schemaArrayTypesSingleTableMappingTest() {

@Test
public void schemaArrayTypesMultipleDifferentTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down Expand Up @@ -195,7 +182,7 @@ public void schemaArrayTypesMultipleDifferentTableMappingTest() {

@Test
public void schemaArrayTypesMultipleSameTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -221,7 +208,7 @@ public void schemaArrayTypesMultipleSameTableMappingTest() {

@Test
public void schemaArrayTypesMixedTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down
Loading

0 comments on commit 05368b8

Please sign in to comment.