Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add cloud testing #362

Merged
merged 10 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
clickhouse: [ "23.7", "24.3", "latest" ]
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
name: ClickHouse ${{ matrix.clickhouse }} tests
steps:
- uses: actions/checkout@v3
Expand All @@ -21,5 +21,7 @@ jobs:
uses: gradle/gradle-build-action@v2
env:
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
CLICKHOUSE_CLOUD_HOST: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}
CLICKHOUSE_CLOUD_PASSWORD: ${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}
with:
arguments: test
5 changes: 3 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import java.io.ByteArrayOutputStream
import java.net.URI
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter

import org.gradle.api.tasks.testing.logging.TestExceptionFormat

val defaultJdkVersion = 17
java {
Expand Down Expand Up @@ -135,13 +135,14 @@ tasks.create("integrationTest", Test::class.java) {


tasks.withType<Test> {
maxParallelForks = (Runtime.getRuntime().availableProcessors() / 2).takeIf { it > 0 } ?: 1
tasks.getByName("check").dependsOn(this)
systemProperty("file.encoding", "windows-1252") // run tests with different encoding
useJUnitPlatform()
testLogging {
events("passed", "skipped", "failed")
}

testLogging.exceptionFormat = TestExceptionFormat.FULL
val javaVersion: Int = (project.findProperty("javaVersion") as String? ?: defaultJdkVersion.toString()).toInt()
logger.info("Running tests using JDK$javaVersion")
javaLauncher.set(javaToolchains.launcherFor {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,9 @@ public ClickHouseHelperClient(ClickHouseClientBuilder builder) {
this.server = create();
}

public String getDatabase() {
return database;
}
public Map<ClickHouseOption, Serializable> getDefaultClientOptions() {
Map<ClickHouseOption, Serializable> options = new HashMap<>();
options.put(ClickHouseClientOption.PRODUCT_NAME, "clickhouse-kafka-connect/"+ClickHouseClientOption.class.getPackage().getImplementationVersion());
Expand Down
63 changes: 52 additions & 11 deletions src/test/java/com/clickhouse/kafka/connect/sink/ClickHouseBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,30 @@
public class ClickHouseBase {
protected static ClickHouseHelperClient chc = null;
protected static ClickHouseContainer db = null;

protected static boolean isCloud = ClickHouseTestHelpers.isCloud();
protected static String database = null;
@BeforeAll
public static void setup() throws IOException {
if (database == null) {
database = String.format("kafka_connect_test_%s", System.currentTimeMillis());
}
if (isCloud == true) {
return;
}
db = new ClickHouseContainer(ClickHouseTestHelpers.CLICKHOUSE_DOCKER_IMAGE);
db.start();
}

@AfterAll
protected static void tearDown() {
db.stop();
if (db != null)
db.stop();
}

protected ClickHouseHelperClient createClient(Map<String,String> props) {
return createClient(props, true);
}
protected ClickHouseHelperClient createClient(Map<String,String> props, boolean withDatabase) {
ClickHouseSinkConfig csc = new ClickHouseSinkConfig(props);

String hostname = csc.getHostname();
Expand All @@ -42,20 +53,37 @@ protected ClickHouseHelperClient createClient(Map<String,String> props) {
boolean sslEnabled = csc.isSslEnabled();
int timeout = csc.getTimeout();


chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
ClickHouseHelperClient tmpChc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(database)
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();


if (withDatabase) {
createDatabase(this.database, tmpChc);
props.put(ClickHouseSinkConnector.DATABASE, this.database);
ClickHouseHelperClient chc = new ClickHouseHelperClient.ClickHouseClientBuilder(hostname, port, csc.getProxyType(), csc.getProxyHost(), csc.getProxyPort())
.setDatabase(this.database)
.setUsername(username)
.setPassword(password)
.sslEnable(sslEnabled)
.setTimeout(timeout)
.setRetry(csc.getRetry())
.build();
return chc;
}
chc = tmpChc;
return chc;
}


protected void createDatabase(String database) {
createDatabase(database, chc);
}
protected void createDatabase(String database, ClickHouseHelperClient chc) {
String createDatabaseQuery = String.format("CREATE DATABASE IF NOT EXISTS `%s`", database);
System.out.println(createDatabaseQuery);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
Expand Down Expand Up @@ -108,13 +136,26 @@ protected int countRows(ClickHouseHelperClient chc, String database, String topi
}
protected Map<String,String> createProps() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
if (isCloud) {
props.put(ClickHouseSinkConnector.HOSTNAME, System.getenv("CLICKHOUSE_CLOUD_HOST"));
props.put(ClickHouseSinkConnector.PORT, ClickHouseTestHelpers.HTTPS_PORT);
props.put(ClickHouseSinkConnector.DATABASE, ClickHouseTestHelpers.DATABASE_DEFAULT);
props.put(ClickHouseSinkConnector.USERNAME, ClickHouseTestHelpers.USERNAME_DEFAULT);
props.put(ClickHouseSinkConnector.PASSWORD, System.getenv("CLICKHOUSE_CLOUD_PASSWORD"));
props.put(ClickHouseSinkConnector.SSL_ENABLED, "true");
props.put("clickhouseSettings", "insert_quorum=3");
} else {
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, ClickHouseTestHelpers.DATABASE_DEFAULT);
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
}
return props;
}

protected String createTopicName(String name) {
return String.format("%s_%d", name, System.currentTimeMillis());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import com.clickhouse.client.ClickHouseResponseSummary;
import com.clickhouse.kafka.connect.ClickHouseSinkConnector;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.helper.ClickHouseTestHelpers;
import com.google.gson.Gson;
import com.google.gson.reflect.TypeToken;
import java.util.ArrayList;
Expand All @@ -27,40 +28,6 @@
import org.testcontainers.clickhouse.ClickHouseContainer;

public class ClickHouseSinkJdbcPropertiesTest extends ClickHouseBase {

private void dropTable(ClickHouseHelperClient chc, String tableName) {
String dropTable = String.format("DROP TABLE IF EXISTS `%s`", tableName);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format


.query(dropTable)
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();

} catch (ClickHouseException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
}

private int countRows(ClickHouseHelperClient chc, String topic) {
String queryCount = String.format("select count(*) from `%s`", topic);
try (ClickHouseClient client = ClickHouseClient.newInstance(ClickHouseProtocol.HTTP);
ClickHouseResponse response = client.read(chc.getServer()) // or client.connect(endpoints)
// you'll have to parse response manually if using a different format


.query(queryCount)
.executeAndWait()) {
ClickHouseResponseSummary summary = response.getSummary();
return response.firstRecord().getValue(0).asInteger();
} catch (ClickHouseException e) {
throw new RuntimeException(e);
}
}

public Collection<SinkRecord> createPrimitiveTypes(String topic, int partition) {
Gson gson = new Gson();
List<SinkRecord> array = new ArrayList<>();
Expand Down Expand Up @@ -194,70 +161,59 @@ public Collection<SinkRecord> createWithEmptyDataRecords(String topic, int parti

@Test
public void primitiveTypesTest() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?load_balancing_policy=random&health_check_interval=5000&failover=2");

ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = "schemaless_primitive_types_table_test";
dropTable(chc, topic);
String topic = createTopicName("schemaless_primitive_types_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = createPrimitiveTypes(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();
assertEquals(sr.size(), countRows(chc, topic));
assertEquals(sr.size(), ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
public void withEmptyDataRecordsTest() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=none");

Map<String, String> props = createProps();
if (isCloud) {
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=none");
} else {
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=none");
}
ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = "schemaless_empty_records_table_test";
dropTable(chc, topic);
String topic = createTopicName("schemaless_empty_records_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = createWithEmptyDataRecords(topic, 1);

ClickHouseSinkTask chst = new ClickHouseSinkTask();
chst.start(props);
chst.put(sr);
chst.stop();
assertEquals(sr.size() / 2, countRows(chc, topic));
assertEquals(sr.size() / 2, ClickHouseTestHelpers.countRows(chc, topic));
}

@Test
public void emptyDataRecordsTestFailedWithSslProp() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
// this will fail connection because the test container do not configured with SSL-TLS
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=strict");

Map<String, String> props = createProps();
if (isCloud) {
// this will fail connection because the test container do not configured with SSL-TLS
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=false&sslmode=strict");
} else {
// this will fail connection because the test container do not configured with SSL-TLS
props.put(ClickHouseSinkConfig.JDBC_CONNECTION_PROPERTIES, "?ssl=true&sslmode=strict");
}
ClickHouseHelperClient chc = createClient(props);
// `arr_int8` Array(Int8), `arr_int16` Array(Int16), `arr_int32` Array(Int32), `arr_int64` Array(Int64), `arr_float32` Array(Float32), `arr_float64` Array(Float64), `arr_bool` Array(Bool)
String topic = "schemaless_empty_records_table_test";
dropTable(chc, topic);
String topic = createTopicName("schemaless_empty_records_table_test");
ClickHouseTestHelpers.dropTable(chc, topic);
createTable(chc, topic, "CREATE TABLE %s ( `off16` Int16, `str` String, `p_int8` Int8, `p_int16` Int16, `p_int32` Int32, `p_int64` Int64, `p_float32` Float32, `p_float64` Float64, `p_bool` Bool) Engine = MergeTree ORDER BY off16");
Collection<SinkRecord> sr = createWithEmptyDataRecords(topic, 1);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,9 @@
import static org.junit.jupiter.api.Assertions.assertEquals;

public class ClickHouseSinkTaskMappingTest extends ClickHouseBase{

private Map<String, String> getTestProperties() {
Map<String, String> props = new HashMap<>();
props.put(ClickHouseSinkConnector.HOSTNAME, db.getHost());
props.put(ClickHouseSinkConnector.PORT, db.getFirstMappedPort().toString());
props.put(ClickHouseSinkConnector.DATABASE, "default");
props.put(ClickHouseSinkConnector.USERNAME, db.getUsername());
props.put(ClickHouseSinkConnector.PASSWORD, db.getPassword());
props.put(ClickHouseSinkConnector.SSL_ENABLED, "false");
return props;
}


@Test
public void schemalessSingleTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -53,7 +40,7 @@ public void schemalessSingleTableMappingTest() {

@Test
public void schemalessMultiDifferentTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down Expand Up @@ -81,7 +68,7 @@ public void schemalessMultiDifferentTableMappingTest() {

@Test
public void schemalessMultiSameTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -104,7 +91,7 @@ public void schemalessMultiSameTableMappingTest() {

@Test
public void schemalessMixedTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "mapping_table_test=table_mapping_test, mapping_table_test2=table_mapping_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down Expand Up @@ -139,7 +126,7 @@ public void schemalessMixedTableMappingTest() {

@Test
public void schemaArrayTypesSingleTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -162,7 +149,7 @@ public void schemaArrayTypesSingleTableMappingTest() {

@Test
public void schemaArrayTypesMultipleDifferentTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down Expand Up @@ -195,7 +182,7 @@ public void schemaArrayTypesMultipleDifferentTableMappingTest() {

@Test
public void schemaArrayTypesMultipleSameTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test");
ClickHouseHelperClient chc = createClient(props);

Expand All @@ -221,7 +208,7 @@ public void schemaArrayTypesMultipleSameTableMappingTest() {

@Test
public void schemaArrayTypesMixedTableMappingTest() {
Map<String, String> props = getTestProperties();
Map<String, String> props = createProps();
props.put(ClickHouseSinkConfig.TABLE_MAPPING, "array_string_table_test=array_string_mapping_table_test, array_string_table_test2=array_string_mapping_table_test2");
ClickHouseHelperClient chc = createClient(props);

Expand Down
Loading
Loading