Skip to content

Commit

Permalink
Merge pull request #368 from jirislav/finish-complex-types-support
Browse files Browse the repository at this point in the history
Add support for Tuples & Variants
  • Loading branch information
Paultagoras authored May 12, 2024
2 parents 86eb90f + ed98ef5 commit cfab3ed
Show file tree
Hide file tree
Showing 22 changed files with 1,288 additions and 235 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,32 @@ jobs:
build:
runs-on: ubuntu-latest
strategy:
fail-fast: false
matrix:
clickhouse: [ "23.7", "24.3", "latest", "cloud" ]
clickhouse: ["23.7", "24.3", "latest", "cloud"]
name: ClickHouse ${{ matrix.clickhouse }} tests
steps:
- name: Check for Cloud Credentials
id: check-cloud-credentials
run: |
if [[ "${{ matrix.clickhouse }}" == "cloud" && (-z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_HOST_SMT }}" || -z "${{ secrets.INTEGRATIONS_TEAM_TESTS_CLOUD_PASSWORD_SMT }}") ]]; then
echo "SKIP_STEP=true" >> $GITHUB_ENV
else
echo "SKIP_STEP=false" >> $GITHUB_ENV
fi
shell: bash

- uses: actions/checkout@v3
if: env.SKIP_STEP != 'true'
- name: Set up JDK 17
if: env.SKIP_STEP != 'true'
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'adopt'
architecture: x64
- name: Setup and execute Gradle 'test' task
if: env.SKIP_STEP != 'true'
uses: gradle/gradle-build-action@v2
env:
CLICKHOUSE_VERSION: ${{ matrix.clickhouse }}
Expand Down
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 1.1.0
* Added support for Tuple type
* Added support for Variant type
* Added support for Nested type
* Refactored Column class so that we use Builder pattern using Lombok
* Refactored recursive Map type parsing to iterative approach using describe_include_subcolumns=1

## 1.0.17
* Added support for ClickHouse Enum type #370
* Added extra break down of time measurement for insert operations
Expand Down
4 changes: 2 additions & 2 deletions CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,10 @@ A sample REST call you could use to create the connector (POST to `localhost:808


## Proposing code changes
This is a relatively straightfoward process:
This is a relatively straightforward process:
* Ensure there's unit test coverage for the changes (and that prior tests work still, of course).
* Update VERSION to the next logical version number
* Add changes to CHANGELOG in a human readable way
* Add changes to CHANGELOG in a human-readable way
* Submit a PR

## Releasing a new version
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
v1.0.17
v1.1.0
11 changes: 11 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,17 @@ dependencies {
implementation("com.google.code.gson:gson:2.10.1")
// https://mvnrepository.com/artifact/org.apache.httpcomponents.client5/httpclient5
implementation("org.apache.httpcomponents.client5:httpclient5:5.3.1")
// https://mvnrepository.com/artifact/com.google.guava/guava
implementation("com.google.guava:guava:33.1.0-jre")


// Avoid telescoping constructors problem with the builder pattern using Lombok
compileOnly("org.projectlombok:lombok:1.18.32")
annotationProcessor("org.projectlombok:lombok:1.18.32")

// To parse JSON response from ClickHouse to parse complex data types correctly
implementation("com.fasterxml.jackson.core:jackson-databind:2.17.0")


// TODO: need to remove ???
implementation("org.slf4j:slf4j-reload4j:2.0.11")
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package com.clickhouse.kafka.connect.sink.data;

import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;

import java.util.List;

public class Data {
private Schema schema;
private Object object;
Expand All @@ -11,6 +14,10 @@ public Data(Schema schema, Object object) {
this.object = object;
}

public List<Field> getFields() {
return schema.fields();
}

public Schema.Type getFieldType() {
return schema.type();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,9 +79,10 @@ public static Map<String, Data> toJsonMap(Struct struct) {
jsonMap.put(fieldName, new Data(field.schema(), toJsonMap(struct.getStruct(fieldName))));
break;
case MAP:
Map<Object, Object> fieldMap = struct.getMap(fieldName);
if (fieldMap != null && !fieldMap.isEmpty() && fieldMap.values().iterator().next() instanceof Struct) {
Map<Object, Object> fieldMap = new HashMap<>(struct.getMap(fieldName));
if (!fieldMap.isEmpty() && fieldMap.values().iterator().next() instanceof Struct) {
// Map values are `Struct`

for (Map.Entry<Object, Object> entry : fieldMap.entrySet()) {
entry.setValue(toJsonMap((Struct) entry.getValue()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.clickhouse.kafka.connect.sink.ClickHouseSinkConfig;
import com.clickhouse.kafka.connect.sink.data.Data;
import com.clickhouse.kafka.connect.sink.data.Record;
import com.clickhouse.kafka.connect.sink.data.StructToJsonMap;
import com.clickhouse.kafka.connect.sink.db.helper.ClickHouseHelperClient;
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
Expand All @@ -32,6 +33,8 @@
import org.apache.kafka.connect.errors.DataException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.collect.Streams;
import reactor.util.function.Tuples;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
Expand Down Expand Up @@ -178,7 +181,7 @@ public void doInsert(List<Record> records, QueryIdentifier queryId, ErrorReporte

private boolean validateDataSchema(Table table, Record record, boolean onlyFieldsName) {
boolean validSchema = true;
for (Column col : table.getColumns()) {
for (Column col : table.getRootColumnsList()) {
String colName = col.getName();
Type type = col.getType();
boolean isNullable = col.isNullable();
Expand Down Expand Up @@ -207,13 +210,19 @@ private boolean validateDataSchema(Table table, Record record, boolean onlyField
break;//I notice we just break here, rather than actually validate the type
default:
if (!colTypeName.equals(dataTypeName)) {
if (!(colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))) {
LOGGER.debug("Data schema name: {}", objSchema.name());
if (!("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal"))) {
validSchema = false;
LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName));
}
}
LOGGER.debug("Data schema name: {}", objSchema.name());

if (colTypeName.equals("STRING") && dataTypeName.equals("BYTES"))
continue;

if (colTypeName.equals("TUPLE") && dataTypeName.equals("STRUCT"))
continue;

if (("DECIMAL".equalsIgnoreCase(colTypeName) && objSchema.name().equals("org.apache.kafka.connect.data.Decimal")))
continue;

validSchema = false;
LOGGER.error(String.format("Table column name [%s] type [%s] is not matching data column type [%s]", col.getName(), colTypeName, dataTypeName));
}
}
}
Expand Down Expand Up @@ -387,16 +396,80 @@ private void doWriteColValue(Column col, ClickHousePipedOutputStream stream, Dat
BinaryStreamUtils.writeVarInt(stream, sizeArrObject);
arrObject.forEach(v -> {
try {
if (col.getSubType().isNullable() && v != null) {
if (col.getArrayType().isNullable() && v != null) {
BinaryStreamUtils.writeNonNull(stream);
}
doWriteColValue(col.getSubType(), stream, new Data(value.getNestedValueSchema(), v), defaultsSupport);
doWriteColValue(col.getArrayType(), stream, new Data(value.getNestedValueSchema(), v), defaultsSupport);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
break;
case TUPLE:
Map<?, ?> jsonMapValues;

Object underlyingObject = value.getObject();
if (underlyingObject.getClass() != Struct.class) {
// Tuples in the root structure are parsed using StructToJsonMap
jsonMapValues = (Map<?, ?>) underlyingObject;
} else {
jsonMapValues = StructToJsonMap.toJsonMap((Struct) underlyingObject);
}

Streams.zip(
col.getTupleFields().stream(), value.getFields().stream(), Tuples::of
).forEach((fields) -> {
Column column = fields.getT1();
Field field = fields.getT2();

Data innerData = (Data) jsonMapValues.get(field.name());
try {
doWriteColValue(column, stream, innerData, defaultsSupport);
} catch (IOException e) {
throw new RuntimeException(e);
}
});
break;
case VARIANT:
// https://github.com/ClickHouse/ClickHouse/pull/58047/files#diff-f56b7f61d5a82c440bb1a078ea8e5dcf2679dc92adbbc28bd89638cbe499363dR368-R384
// https://github.com/ClickHouse/ClickHouse/blob/658a8e9a9b1658cd12c78365f9829b35d016f1b2/src/Columns/ColumnVariant.h#L10-L56
mapTmp = (Map<?, ?>) value.getObject();
Optional<Data> variantValueOption = mapTmp.values().stream()
.map(o -> (Data) o)
.filter(data -> data.getObject() != null)
.findFirst();

// Null Discriminator (https://github.com/ClickHouse/ClickHouse/blob/658a8e9a9b1658cd12c78365f9829b35d016f1b2/src/Columns/ColumnVariant.h#L65)
int nullDiscriminator = 255;
if (variantValueOption.isEmpty()) {
BinaryStreamUtils.writeUnsignedInt8(stream, nullDiscriminator);
} else {
Data variantValue = variantValueOption.get();

String fieldTypeName = variantValue.getFieldType().getName();
Optional<Integer> globalDiscriminator = col.getVariantGlobalDiscriminator(fieldTypeName);
if (globalDiscriminator.isEmpty()) {
LOGGER.error("Unable to determine the global discriminator of {} variant! Writing NULL variant instead.", fieldTypeName);
BinaryStreamUtils.writeUnsignedInt8(stream, nullDiscriminator);
return;
}
BinaryStreamUtils.writeUnsignedInt8(stream, globalDiscriminator.get());

// Variants support parametrized types, such as Decimal(x, y). Because of that, we can't use
// the doWritePrimitive method.
doWriteColValue(
col.getVariantGlobalDiscriminators().get(globalDiscriminator.get()).getT1(),
stream,
variantValue,
defaultsSupport
);
}
break;
default:
// If you wonder, how NESTED works in JDBC:
// https://github.com/ClickHouse/clickhouse-java/blob/6cbbd8fe3f86ac26d12a95e0c2b964f3a3755fc9/clickhouse-data/src/main/java/com/clickhouse/data/format/ClickHouseRowBinaryProcessor.java#L159
LOGGER.error("Cannot serialize unsupported type {}", columnType);
}
}

Expand Down Expand Up @@ -590,7 +663,7 @@ protected void doInsertRawBinary(List<Record> records, Table table, QueryIdentif
// write bytes into the piped stream
for (Record record : records) {
if (record.getSinkRecord().value() != null) {
for (Column col : table.getColumns()) {
for (Column col : table.getRootColumnsList()) {
long beforePushStream = System.currentTimeMillis();
doWriteCol(record, col, stream, supportDefaults);
pushStreamTime += System.currentTimeMillis() - beforePushStream;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package com.clickhouse.kafka.connect.sink.db.helper;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import lombok.Builder;
import lombok.Data;
import lombok.extern.jackson.Jacksonized;

/**
* Java object representation of one DESCRIBE TABLE result row.
* <p>
* We use Jackson to instantiate it from JSON.
*/
@Data
@Builder
@Jacksonized
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
public class ClickHouseFieldDescriptor {
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
private String name;
private String type;
private String defaultType;
private String defaultExpression;
private String comment;
private String codecExpression;
private String ttlExpression;
private boolean isSubcolumn;

public boolean isAlias() {
return "ALIAS".equals(defaultType);
}

public boolean isMaterialized() {
return "MATERIALIZED".equals(defaultType);
}

public boolean hasDefault() {
return "DEFAULT".equals(defaultType);
}

public static ClickHouseFieldDescriptor fromJsonRow(String json) throws JsonProcessingException {
return OBJECT_MAPPER.readValue(json.replace("\n", "\\n"), ClickHouseFieldDescriptor.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import com.clickhouse.kafka.connect.sink.db.mapping.Column;
import com.clickhouse.kafka.connect.sink.db.mapping.Table;
import com.clickhouse.kafka.connect.util.Utils;
import com.fasterxml.jackson.core.JsonProcessingException;
import lombok.Getter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +34,7 @@ public class ClickHouseHelperClient {
private final boolean sslEnabled;
private final String jdbcConnectionProperties;
private final int timeout;
@Getter
private ClickHouseNode server = null;
private final int retry;
private ClickHouseProxyType proxyType = null;
Expand Down Expand Up @@ -86,7 +89,7 @@ private ClickHouseNode create() {
tmpJdbcConnectionProperties
);

LOGGER.info("ClickHouse URL: " + url);
LOGGER.info("ClickHouse URL: {}", url);

if (username != null && password != null) {
LOGGER.debug(String.format("Adding username [%s]", username));
Expand Down Expand Up @@ -137,10 +140,6 @@ public String version() {
}
}

public ClickHouseNode getServer() {
return this.server;
}

public ClickHouseResponse query(String query) {
return query(query, null);
}
Expand All @@ -167,7 +166,6 @@ public ClickHouseResponse query(String query, ClickHouseFormat clickHouseFormat)
throw new RuntimeException(ce);
}


public List<String> showTables() {
List<String> tablesNames = new ArrayList<>();
try (ClickHouseClient client = ClickHouseClient.builder()
Expand Down Expand Up @@ -200,32 +198,30 @@ public Table describeTable(String tableName) {
.nodeSelector(ClickHouseNodeSelector.of(ClickHouseProtocol.HTTP))
.build();
ClickHouseResponse response = client.read(server)
.set("describe_include_subcolumns", true)
.format(ClickHouseFormat.JSONEachRow)
.query(describeQuery)
.executeAndWait()) {

Table table = new Table(tableName);
for (ClickHouseRecord r : response.records()) {
boolean hasDefault = false;
ClickHouseValue v = r.getValue(0);
String value = v.asString();
String[] cols = value.split("\t");
if (cols.length > 2) {
String defaultKind = cols[2];
if ("ALIAS".equals(defaultKind) || "MATERIALIZED".equals(defaultKind)) {
LOGGER.debug("Skipping column {} as it is an alias or materialized view", cols[0]);
// Only insert into "real" columns
continue;
} else if("DEFAULT".equals(defaultKind)) {
table.setHasDefaults(true);
hasDefault = true;
}

ClickHouseFieldDescriptor fieldDescriptor = ClickHouseFieldDescriptor.fromJsonRow(v.asString());
if (fieldDescriptor.isAlias() || fieldDescriptor.isMaterialized()) {
LOGGER.debug("Skipping column {} as it is an alias or materialized view", fieldDescriptor.getName());
continue;
}
String name = cols[0];
String type = cols[1];
Column column = Column.extractColumn(name, type, false, hasDefault);

if (fieldDescriptor.hasDefault()) {
table.hasDefaults(true);
}

Column column = Column.extractColumn(fieldDescriptor);
table.addColumn(column);
}
return table;
} catch (ClickHouseException e) {
} catch (ClickHouseException | JsonProcessingException e) {
LOGGER.error(String.format("Exception when running describeTable %s", describeQuery), e);
return null;
}
Expand Down
Loading

0 comments on commit cfab3ed

Please sign in to comment.