Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change debezium from cloudevent to sourcerecord #79

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -144,13 +144,13 @@
</execution>
</executions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:3.17.2:exe:${os.detected.classifier}</protocArtifact>
<protocArtifact>com.google.protobuf:protoc:3.20.2:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>

<clearOutputDirectory>false</clearOutputDirectory>
<!-- <outputDirectory>${project.basedir}/src/main/java/</outputDirectory>-->
<outputDirectory>${project.build.directory}/generated-sources/protobuf</outputDirectory>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.39.0:exe:${os.detected.classifier}</pluginArtifact>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:1.45.1:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
</plugin>
</plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
package com.linkall.cdk.database.debezium;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkall.cdk.config.SourceConfig;

import java.nio.charset.StandardCharsets;
import java.util.Properties;
import java.util.stream.Collectors;
import java.util.stream.Stream;
Expand All @@ -13,6 +17,8 @@ public abstract class DebeziumConfig extends SourceConfig {

protected abstract Properties getDebeziumProperties();

protected abstract Object getOffset();

protected String tableFormat(String name, Stream<String> table) {
return table
.map(stream -> name + "." + stream)
Expand All @@ -31,33 +37,28 @@ protected Properties getProperties() {
props.setProperty("tombstones.on.delete", "false");

props.setProperty("offset.storage", KvStoreOffsetBackingStore.class.getCanonicalName());
// if (getOffsetKey()!=null) {
// props.setProperty(
// KvStoreOffsetBackingStore.OFFSET_STORAGE_KV_STORE_KEY_CONFIG, getOffsetKey());
// }
// Map<String, Object> configOffset = getConfigOffset();
// if (configOffset!=null && configOffset.size() > 0) {
// Converter valueConverter = new JsonConverter();
// Map<String, Object> valueConfigs = new HashMap<>();
// valueConfigs.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, false);
// valueConverter.configure(valueConfigs, false);
// byte[] offsetValue = valueConverter.fromConnectData(dbConfig.getDatabase(), null, configOffset);
// props.setProperty(
// KvStoreOffsetBackingStore.OFFSET_CONFIG_VALUE,
// new String(offsetValue, StandardCharsets.UTF_8));
// }

Object offset = getOffset();
if (offset!=null) {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
byte[] offsetValue;
try {
offsetValue = objectMapper.writeValueAsBytes(offset);
} catch (JsonProcessingException e) {
throw new RuntimeException("offset convert to json error", e);
}
props.setProperty(
KvStoreOffsetBackingStore.OFFSET_CONFIG_VALUE,
new String(offsetValue, StandardCharsets.UTF_8));
}
props.setProperty("offset.flush.interval.ms", "1000");

// https://debezium.io/documentation/reference/configuration/avro.html
props.setProperty("key.converter", "org.apache.kafka.connect.json.JsonConverter");
props.setProperty("key.converter.schemas.enable", "false");
props.setProperty("value.converter", "io.debezium.converters.CloudEventsConverter");
props.setProperty("value.converter.data.serializer.type", "json");
props.setProperty("value.converter.json.schemas.enable", "false");
props.setProperty("value.converter.schemas.enable", "false");

props.putAll(getDebeziumProperties());
if (customDebezium != null) {
if (customDebezium!=null) {
props.putAll(customDebezium);
}
return props;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,83 +14,58 @@

package com.linkall.cdk.database.debezium;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.linkall.cdk.config.Config;
import com.linkall.cdk.connector.Element;
import com.linkall.cdk.connector.FailedCallback;
import com.linkall.cdk.connector.Source;
import com.linkall.cdk.connector.Tuple;
import io.cloudevents.CloudEvent;
import io.cloudevents.CloudEventData;
import io.cloudevents.core.v1.CloudEventBuilder;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.debezium.connector.AbstractSourceInfo;
import io.debezium.data.Envelope;
import io.debezium.embedded.Connect;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.CloudEvents;
import io.debezium.engine.spi.OffsetCommitPolicy;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.*;

public abstract class DebeziumSource implements Source, DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
private static final Logger LOGGER = LoggerFactory.getLogger(DebeziumSource.class);
private final ObjectMapper mapper = new ObjectMapper();
public abstract class DebeziumSource implements Source, DebeziumEngine.ChangeConsumer<ChangeEvent<SourceRecord, SourceRecord>> {
protected static final Logger LOGGER = LoggerFactory.getLogger(DebeziumSource.class);
private final BlockingQueue<Tuple> events;
protected DebeziumConfig debeziumConfig;
private DebeziumEngine<ChangeEvent<String, String>> engine;
private DebeziumEngine<ChangeEvent<SourceRecord, SourceRecord>> engine;
private ExecutorService executor;
protected final JsonConverter jsonDataConverter = new JsonConverter();
protected static final String EXTENSION_NAME_PREFIX = "xv";

public DebeziumSource() {
this.events = new LinkedBlockingQueue<>();
Map<String, String> ceJsonConfig = new HashMap<>();
ceJsonConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
jsonDataConverter.configure(ceJsonConfig, false);
}

protected void adapt(CloudEventBuilder builder, String key, Object value) throws IOException {
switch (key) {
case "id":
builder.withId(UUID.randomUUID().toString());
break;
case "source":
builder.withSource(URI.create(value.toString()));
break;
case "specversion":
break;
case "type":
builder.withType(value.toString());
break;
case "datacontenttype":
builder.withDataContentType(value.toString());
break;
case "dataschema":
builder.withDataSchema(URI.create(value.toString()));
break;
case "subject":
builder.withSubject(value.toString());
break;
case "time":
builder.withTime(OffsetDateTime.parse(value.toString()));
break;
case "data":
builder.withData(convertData(value));
break;
default:
builder.withExtension(key, value.toString());
break;
}
}

abstract protected CloudEventData convertData(Object data) throws IOException;

@Override
final public void destroy() throws Exception {
if (engine != null)
if (engine!=null) {
engine.close();
}
executor.shutdown();
}

Expand All @@ -110,8 +85,8 @@ final public void initialize(Config config) throws Exception {
}

@Override
final public void handleBatch(List<ChangeEvent<String, String>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer)
final public void handleBatch(List<ChangeEvent<SourceRecord, SourceRecord>> records,
DebeziumEngine.RecordCommitter<ChangeEvent<SourceRecord, SourceRecord>> committer)
throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
LOGGER.info("Received event count {}", records.size());
Expand All @@ -123,23 +98,24 @@ final public void handleBatch(List<ChangeEvent<String, String>> records,
latch.countDown();
});

t.setFailed((FailedCallback<ChangeEvent<String, String>>) (success, failed, error) -> {
t.setFailed((FailedCallback<ChangeEvent<SourceRecord, SourceRecord>>) (success, failed, error) -> {
LOGGER.error("event send failed:{}, {}", error, failed);
committer.markProcessed(success.get(success.size() - 1).getOriginal());
latch.countDown();
});

for (ChangeEvent<String, String> record : records) {
if (record.value() == null) {
for (ChangeEvent<SourceRecord, SourceRecord> record : records) {
if (record.value()==null) {
continue;
}

try {
t.addElement(new Element<>(this.convert(record.value()), record));
} catch (IOException e) {
latch.countDown(); // How to process offset?
LOGGER.error("failed to parse record data {} to json, error: {}", record.value(), e);
Struct struct = (Struct) record.value().value();
String op = struct.getString(Envelope.FieldName.OPERATION);
Operation operation = convertOperation(op);
if (operation==null) {
LOGGER.warn("unknown debezium op {}, record will ignore {}", op, record);
continue;
}
t.addElement(new Element<>(this.convert(struct, operation), record));
}
this.events.put(t);
LOGGER.info("Received event count await {}", records.size());
Expand All @@ -149,7 +125,7 @@ final public void handleBatch(List<ChangeEvent<String, String>> records,
}

final protected void start() {
engine = DebeziumEngine.create(CloudEvents.class)
engine = DebeziumEngine.create(Connect.class)
.using(this.debeziumConfig.getProperties())
.using(OffsetCommitPolicy.always())
.notifying(this)
Expand All @@ -160,15 +136,60 @@ final protected void start() {
executor.execute(engine);
}

private CloudEvent convert(String record) throws IOException {
Map<String, Object> m = this.mapper.readValue(record.getBytes(StandardCharsets.UTF_8), Map.class);
CloudEventBuilder builder = new CloudEventBuilder();
for (Map.Entry<String, Object> entry : m.entrySet()) {
if (entry.getValue() == null) {
continue;
}
this.adapt(builder, entry.getKey(), entry.getValue());
}
protected CloudEvent convert(Struct struct, Operation operation) {
String op = struct.getString(Envelope.FieldName.OPERATION);
Struct source = struct.getStruct(Envelope.FieldName.SOURCE);
String connectorType = source.getString(AbstractSourceInfo.DEBEZIUM_CONNECTOR_KEY);
String name = source.getString(AbstractSourceInfo.SERVER_NAME_KEY);
CloudEventBuilder builder = CloudEventBuilder.v1();
builder.withId(UUID.randomUUID().toString())
.withSource(URI.create("/debezium/" + connectorType + "/" + name))
.withType("debezium." + connectorType + ".datachangeevent")
.withTime(OffsetDateTime.ofInstant(
Instant.ofEpochMilli(struct.getInt64(Envelope.FieldName.TIMESTAMP)), ZoneOffset.UTC))
.withData("application/json", eventData(struct))
.withExtension(extensionName("debeziumop"), op)
.withExtension(extensionName("debeziumname"), name)
.withExtension(extensionName("op"), operation.getCode());
eventExtension(builder, struct);
return builder.build();
}

protected static String extensionName(String name) {
return EXTENSION_NAME_PREFIX + name;
}

/**
* build event extension
*
* @param builder CloudEventBuilder
* @param struct SourceRecord value
*/
protected abstract void eventExtension(CloudEventBuilder builder, Struct struct);

protected byte[] eventData(Struct struct) {
String fieldName = Envelope.FieldName.AFTER;
Object dataValue = struct.get(fieldName);
if (dataValue==null) {
fieldName = Envelope.FieldName.BEFORE;
dataValue = struct.get(fieldName);
}
Schema dataSchema = struct.schema().field(fieldName).schema();
return jsonDataConverter.fromConnectData("debezium", dataSchema, dataValue);
}

protected Operation convertOperation(String op) {
switch (op) {
case "r":
case "c":
return Operation.CREATE;
case "u":
return Operation.UPDATE;
case "d":
return Operation.DELETE;
default:
return null;
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.linkall.cdk.store.KVStore;
import com.linkall.cdk.store.KVStoreFactory;
import org.apache.kafka.connect.json.JsonConverter;
import org.apache.kafka.connect.json.JsonConverterConfig;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.MemoryOffsetBackingStore;
Expand Down Expand Up @@ -70,12 +71,14 @@ public void configure(WorkerConfig config) {
Optional.ofNullable(map.get(OFFSET_STORAGE_KV_STORE_KEY_CONFIG)).orElse(DEFAULT_KEY_NAME);
// read from config
String offsetConfigValue = map.get(OFFSET_CONFIG_VALUE);
if (offsetConfigValue == null || offsetConfigValue.isEmpty()) {
if (offsetConfigValue==null || offsetConfigValue.isEmpty()) {
return;
}
String serverName = map.get("name");
Converter keyConverter = new JsonConverter();
keyConverter.configure(config.originals(), true);
Map<String, String> keyConfig = new HashMap<>();
keyConfig.put(JsonConverterConfig.SCHEMAS_ENABLE_CONFIG, "false");
keyConverter.configure(keyConfig, true);
Map<String, Object> keyMap = new HashMap<>();
keyMap.put("server", serverName);
byte[] key = keyConverter.fromConnectData(serverName, null, Arrays.asList(serverName, keyMap));
Expand All @@ -98,7 +101,7 @@ public synchronized void stop() {

private void load() {
byte[] value = store.get(keyName);
if (value == null) {
if (value==null) {
return;
}
loadFromKvStore(value);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.linkall.cdk.database.debezium;

public enum Operation {
CREATE("c"),
UPDATE("u"),
DELETE("d");

private final String code;

Operation(String code) {
this.code = code;
}

public String getCode() {
return code;
}
}