diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 186e391f7e978..339522fe45fa4 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -64,5 +64,6 @@ ${basedir}/../../pulsar-io/canal/target/pulsar-io-canal-${project.version}.nar ${basedir}/../../pulsar-io/netty/target/pulsar-io-netty-${project.version}.nar ${basedir}/../../pulsar-io/mongo/target/pulsar-io-mongo-${project.version}.nar + ${basedir}/../../pulsar-io/debezium/mysql/target/pulsar-io-debezium-mysql-${project.version}.nar diff --git a/pulsar-io/debezium/core/pom.xml b/pulsar-io/debezium/core/pom.xml new file mode 100644 index 0000000000000..23ca9734311f3 --- /dev/null +++ b/pulsar-io/debezium/core/pom.xml @@ -0,0 +1,104 @@ + + + 4.0.0 + + org.apache.pulsar + pulsar-io-debezium + 2.4.0-SNAPSHOT + + + pulsar-io-debezium-core + Pulsar IO :: Debezium :: Core + + + + + ${project.groupId} + pulsar-io-core + ${project.version} + + + + io.debezium + debezium-core + ${debezium.version} + + + + ${project.groupId} + pulsar-io-kafka-connect-adaptor + ${project.version} + + + + org.apache.kafka + kafka_${scala.binary.version} + ${kafka-client.version} + + + + org.apache.kafka + connect-runtime + ${kafka-client.version} + + + + ${project.groupId} + pulsar-client-original + ${project.version} + + + + ${project.groupId} + pulsar-broker + ${project.version} + test + + + + ${project.groupId} + managed-ledger-original + ${project.version} + test-jar + test + + + + ${project.groupId} + pulsar-zookeeper-utils + ${project.version} + test-jar + test + + + + ${project.groupId} + pulsar-broker + ${project.version} + test + test-jar + + + + + diff --git a/pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java b/pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java similarity index 100% rename from pulsar-io/debezium/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java rename to pulsar-io/debezium/core/src/main/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistory.java diff --git a/pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java b/pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java similarity index 100% rename from pulsar-io/debezium/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java rename to pulsar-io/debezium/core/src/test/java/org/apache/pulsar/io/debezium/PulsarDatabaseHistoryTest.java diff --git a/pulsar-io/debezium/mysql/pom.xml b/pulsar-io/debezium/mysql/pom.xml new file mode 100644 index 0000000000000..97541651926d1 --- /dev/null +++ b/pulsar-io/debezium/mysql/pom.xml @@ -0,0 +1,59 @@ + + + 4.0.0 + + org.apache.pulsar + pulsar-io-debezium + 2.4.0-SNAPSHOT + + + pulsar-io-debezium-mysql + Pulsar IO :: Debezium :: mysql + + + + + ${project.groupId} + pulsar-io-debezium-core + ${project.version} + + + + io.debezium + debezium-connector-mysql + ${debezium.version} + + + + + + + + + org.apache.nifi + nifi-nar-maven-plugin + + + + + diff --git a/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java new file mode 100644 index 0000000000000..e8fe0c3873a70 --- /dev/null +++ b/pulsar-io/debezium/mysql/src/main/java/org/apache/pulsar/io/debezium/mysql/DebeziumMysqlSource.java @@ -0,0 +1,110 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.io.debezium.mysql; + +import java.util.Map; + +import io.debezium.connector.mysql.MySqlConnectorConfig; +import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.io.core.SourceContext; +import org.apache.pulsar.io.debezium.PulsarDatabaseHistory; +import org.apache.pulsar.io.kafka.connect.KafkaConnectSource; +import org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig; + +/** + * A pulsar source that runs + */ +@Slf4j +public class DebeziumMysqlSource extends KafkaConnectSource { + static private final String DEFAULT_TASK = "io.debezium.connector.mysql.MySqlConnectorTask"; + static private final String DEFAULT_CONVERTER = "org.apache.kafka.connect.json.JsonConverter"; + static private final String DEFAULT_HISTORY = "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"; + static private final String DEFAULT_OFFSET_TOPIC = "debezium-mysql-offset-topic"; + static private final String DEFAULT_HISTORY_TOPIC = "debezium-mysql-history-topic"; + + private static void throwExceptionIfConfigNotMatch(Map config, + String key, + String value) throws IllegalArgumentException { + Object orig = config.get(key); + if (orig == null) { + config.put(key, value); + return; + } + + // throw exception if value not match + if (!orig.equals(value)) { + throw new IllegalArgumentException("Expected " + value + " but has " + orig); + } + } + + private static void setConfigIfNull(Map config, String key, String value) { + Object orig = config.get(key); + if (orig == null) { + config.put(key, value); + } + } + + // namespace: tenant/namespace + private static String topicNamespace(SourceContext sourceContext) { + String tenant = sourceContext.getTenant(); + String namespace = sourceContext.getNamespace(); + + return (StringUtils.isEmpty(tenant) ? TopicName.PUBLIC_TENANT : tenant) + "/" + + (StringUtils.isEmpty(namespace) ? TopicName.DEFAULT_NAMESPACE : namespace); + } + + @Override + public void open(Map config, SourceContext sourceContext) throws Exception { + // connector task + throwExceptionIfConfigNotMatch(config, TaskConfig.TASK_CLASS_CONFIG, DEFAULT_TASK); + + // key.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + // value.converter + setConfigIfNull(config, PulsarKafkaWorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, DEFAULT_CONVERTER); + + // database.history implementation class + setConfigIfNull(config, MySqlConnectorConfig.DATABASE_HISTORY.name(), DEFAULT_HISTORY); + + // database.history.pulsar.service.url, this is set as the value of pulsar.service.url if null. + String serviceUrl = (String) config.get(PulsarKafkaWorkerConfig.PULSAR_SERVICE_URL_CONFIG); + if (serviceUrl == null) { + throw new IllegalArgumentException("Pulsar service URL not provided."); + } + setConfigIfNull(config, PulsarDatabaseHistory.SERVICE_URL.name(), serviceUrl); + + String topicNamespace = topicNamespace(sourceContext); + // topic.namespace + setConfigIfNull(config, PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG, topicNamespace); + + String sourceName = sourceContext.getSourceName(); + // database.history.pulsar.topic: history topic name + setConfigIfNull(config, PulsarDatabaseHistory.TOPIC.name(), + topicNamespace + "/" + sourceName + "-" + DEFAULT_HISTORY_TOPIC); + // offset.storage.topic: offset topic name + setConfigIfNull(config, PulsarKafkaWorkerConfig.OFFSET_STORAGE_TOPIC_CONFIG, + topicNamespace + "/" + sourceName + "-" + DEFAULT_OFFSET_TOPIC); + + super.open(config, sourceContext); + } + +} diff --git a/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000000000..288a0df9cfb40 --- /dev/null +++ b/pulsar-io/debezium/mysql/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: debezium-mysql +description: Debezium MySql Source +sourceClass: org.apache.pulsar.io.debezium.mysql.DebeziumMysqlSource diff --git a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml similarity index 64% rename from pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml rename to pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml index b0361f9cd0db8..7056bc10e3935 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/resources/debezium-mysql-source-config.yaml +++ b/pulsar-io/debezium/mysql/src/main/resources/debezium-mysql-source-config.yaml @@ -19,17 +19,13 @@ tenant: "test" namespace: "test-namespace" -name: "debezium-kafka-source" -topicName: "kafka-connect-topic" -archive: "connectors/pulsar-io-kafka-connect-adaptor-2.3.0-SNAPSHOT.nar" +name: "debezium-mysql-source" +topicName: "debezium-mysql-topic" +archive: "connectors/pulsar-io-debezium-mysql-2.4.0-SNAPSHOT.nar" -##autoAck: true parallelism: 1 configs: - ## sourceTask - task.class: "io.debezium.connector.mysql.MySqlConnectorTask" - ## config for mysql, docker image: debezium/example-mysql:0.8 database.hostname: "localhost" database.port: "3306" @@ -39,15 +35,9 @@ configs: database.server.name: "dbserver1" database.whitelist: "inventory" - database.history: "org.apache.pulsar.io.debezium.PulsarDatabaseHistory" - database.history.pulsar.topic: "history-topic" - database.history.pulsar.service.url: "pulsar://127.0.0.1:6650" - ## KEY_CONVERTER_CLASS_CONFIG, VALUE_CONVERTER_CLASS_CONFIG - key.converter: "org.apache.kafka.connect.json.JsonConverter" - value.converter: "org.apache.kafka.connect.json.JsonConverter" ## PULSAR_SERVICE_URL_CONFIG pulsar.service.url: "pulsar://127.0.0.1:6650" - ## OFFSET_STORAGE_TOPIC_CONFIG - offset.storage.topic: "offset-topic" + database.history.pulsar.topic: "mysql-history-topic" + offset.storage.topic: "mysql-offset-topic" diff --git a/pulsar-io/debezium/pom.xml b/pulsar-io/debezium/pom.xml index ac6a5d925e96e..37a50cc5fc00a 100644 --- a/pulsar-io/debezium/pom.xml +++ b/pulsar-io/debezium/pom.xml @@ -21,6 +21,7 @@ 4.0.0 + pom org.apache.pulsar pulsar-io @@ -30,75 +31,9 @@ pulsar-io-debezium Pulsar IO :: Debezium - - - - ${project.groupId} - pulsar-io-core - ${project.version} - - - - io.debezium - debezium-core - ${debezium.version} - - - - io.debezium - debezium-connector-mysql - ${debezium.version} - - - - org.apache.kafka - kafka_${scala.binary.version} - ${kafka-client.version} - - - - org.apache.kafka - connect-runtime - ${kafka-client.version} - - - - ${project.groupId} - pulsar-client-original - ${project.version} - - - - ${project.groupId} - pulsar-broker - ${project.version} - test - - - - ${project.groupId} - managed-ledger-original - ${project.version} - test-jar - test - - - - ${project.groupId} - pulsar-zookeeper-utils - ${project.version} - test-jar - test - - - - ${project.groupId} - pulsar-broker - ${project.version} - test - test-jar - - - + + core + mysql + diff --git a/pulsar-io/kafka-connect-adaptor/pom.xml b/pulsar-io/kafka-connect-adaptor/pom.xml index cee7ec81c85c4..da5c6208d6da8 100644 --- a/pulsar-io/kafka-connect-adaptor/pom.xml +++ b/pulsar-io/kafka-connect-adaptor/pom.xml @@ -38,12 +38,6 @@ ${project.version} - - ${project.groupId} - pulsar-io-debezium - ${project.version} - - org.apache.kafka kafka_${scala.binary.version} diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java index a1647033a9be3..3ccbb4bc1c2b6 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/KafkaConnectSource.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.io.kafka.connect; +import static org.apache.pulsar.io.kafka.connect.PulsarKafkaWorkerConfig.TOPIC_NAMESPACE_CONFIG; + import java.util.Base64; import java.util.Collections; import java.util.HashMap; @@ -66,6 +68,7 @@ public class KafkaConnectSource implements Source> { private CompletableFuture flushFuture; private OffsetBackingStore offsetStore; private OffsetStorageReader offsetReader; + private String topicNamespace; @Getter private OffsetStorageWriter offsetWriter; // number of outstandingRecords that have been polled but not been acked @@ -86,6 +89,8 @@ public void open(Map config, SourceContext sourceContext) throws .getDeclaredConstructor() .newInstance(); + topicNamespace = stringConfig.get(TOPIC_NAMESPACE_CONFIG); + // initialize the key and value converter keyConverter = ((Class)Class.forName(stringConfig.get(PulsarKafkaWorkerConfig.KEY_CONVERTER_CLASS_CONFIG))) .asSubclass(Converter.class) @@ -193,7 +198,7 @@ private class KafkaSourceRecord implements Record> { .stream() .map(e -> e.getKey() + "=" + e.getValue()) .collect(Collectors.joining(","))); - this.destinationTopic = Optional.of(srcRecord.topic()); + this.destinationTopic = Optional.of(topicNamespace + "/" + srcRecord.topic()); } @Override diff --git a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java index d00f77615eb8b..624c59acd84de 100644 --- a/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java +++ b/pulsar-io/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarKafkaWorkerConfig.java @@ -44,6 +44,12 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig { public static final String PULSAR_SERVICE_URL_CONFIG = "pulsar.service.url"; private static final String PULSAR_SERVICE_URL_CONFIG_DOC = "pulsar service url"; + /** + * topic.namespace + */ + public static final String TOPIC_NAMESPACE_CONFIG = "topic.namespace"; + private static final String TOPIC_NAMESPACE_CONFIG_DOC = "namespace of topic name to store the output topics"; + static { CONFIG = new ConfigDef() .define(OFFSET_STORAGE_TOPIC_CONFIG, @@ -53,10 +59,14 @@ public class PulsarKafkaWorkerConfig extends WorkerConfig { .define(PULSAR_SERVICE_URL_CONFIG, Type.STRING, Importance.HIGH, - PULSAR_SERVICE_URL_CONFIG_DOC); + PULSAR_SERVICE_URL_CONFIG_DOC) + .define(TOPIC_NAMESPACE_CONFIG, + Type.STRING, + "public/default", + Importance.HIGH, + TOPIC_NAMESPACE_CONFIG_DOC); } - public PulsarKafkaWorkerConfig(Map props) { super(CONFIG, props); } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java index 2d0613e167cf7..6d87dd65eef15 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/containers/DebeziumMySQLContainer.java @@ -23,7 +23,7 @@ public class DebeziumMySQLContainer extends ChaosContainer { - public static final String NAME = "mysql"; + public static final String NAME = "debezium-mysql-example"; static final Integer[] PORTS = { 3306 }; private static final String IMAGE_NAME = "debezium/example-mysql:0.8"; diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 18eaba945fb88..9c44a05186e24 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -1163,7 +1163,7 @@ private void testDebeziumMySqlConnect() final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String outputTopicName = "debe-output-topic-name"; - final String consumeTopicName = "dbserver1.inventory.products"; + final String consumeTopicName = "public/default/dbserver1.inventory.products"; final String sourceName = "test-source-connector-" + functionRuntimeType + "-name-" + randomName(8); @@ -1182,6 +1182,7 @@ private void testDebeziumMySqlConnect() .subscriptionType(SubscriptionType.Exclusive) .subscribe(); + @Cleanup DebeziumMySqlSourceTester sourceTester = new DebeziumMySqlSourceTester(pulsarCluster); // setup debezium mysql server @@ -1204,15 +1205,13 @@ private void testDebeziumMySqlConnect() waitForProcessingSourceMessages(tenant, namespace, sourceName, numMessages); // validate the source result - sourceTester.validateSourceResult(consumer, null); + sourceTester.validateSourceResult(consumer, 9); // delete the source deleteSource(tenant, namespace, sourceName); // get source info (source should be deleted) getSourceInfoNotFound(tenant, namespace, sourceName); - - pulsarCluster.stopService("mysql", sourceTester.getDebeziumMySqlContainer()); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java index b122ccc9670c5..ffecbc071f93a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/DebeziumMySqlSourceTester.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.tests.integration.io; +import java.io.Closeable; import java.util.Map; import java.util.concurrent.TimeUnit; import lombok.Getter; @@ -39,9 +40,9 @@ * which is a MySQL database server preconfigured with an inventory database. */ @Slf4j -public class DebeziumMySqlSourceTester extends SourceTester { +public class DebeziumMySqlSourceTester extends SourceTester implements Closeable { - private static final String NAME = "kafka-connect-adaptor"; + private static final String NAME = "debezium-mysql"; private final String pulsarServiceUrl; @@ -55,28 +56,21 @@ public DebeziumMySqlSourceTester(PulsarCluster cluster) { this.pulsarCluster = cluster; pulsarServiceUrl = "pulsar://pulsar-proxy:" + PulsarContainer.BROKER_PORT; - sourceConfig.put("task.class", "io.debezium.connector.mysql.MySqlConnectorTask"); - sourceConfig.put("database.hostname", "mysql"); + sourceConfig.put("database.hostname", DebeziumMySQLContainer.NAME); sourceConfig.put("database.port", "3306"); sourceConfig.put("database.user", "debezium"); sourceConfig.put("database.password", "dbz"); sourceConfig.put("database.server.id", "184054"); sourceConfig.put("database.server.name", "dbserver1"); sourceConfig.put("database.whitelist", "inventory"); - sourceConfig.put("database.history", "org.apache.pulsar.io.debezium.PulsarDatabaseHistory"); - sourceConfig.put("database.history.pulsar.topic", "history-topic"); - sourceConfig.put("database.history.pulsar.service.url", pulsarServiceUrl); - sourceConfig.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); - sourceConfig.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); sourceConfig.put("pulsar.service.url", pulsarServiceUrl); - sourceConfig.put("offset.storage.topic", "offset-topic"); } @Override public void setServiceContainer(DebeziumMySQLContainer container) { log.info("start debezium mysql server container."); debeziumMySqlContainer = container; - pulsarCluster.startService("mysql", debeziumMySqlContainer); + pulsarCluster.startService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer); } @Override @@ -90,7 +84,7 @@ public Map produceSourceMessages(int numMessages) throws Excepti return null; } - public void validateSourceResult(Consumer consumer, Map kvs) throws Exception { + public void validateSourceResult(Consumer consumer, int number) throws Exception { int recordsNumber = 0; Message msg = consumer.receive(2, TimeUnit.SECONDS); while(msg != null) { @@ -101,7 +95,15 @@ public void validateSourceResult(Consumer consumer, Map msg = consumer.receive(1, TimeUnit.SECONDS); } - Assert.assertEquals(recordsNumber, 9); + Assert.assertEquals(recordsNumber, number); log.info("Stop debezium mysql server container. topic: {} has {} records.", consumer.getTopic(), recordsNumber); } + + @Override + public void close() { + if (pulsarCluster != null) { + pulsarCluster.stopService(DebeziumMySQLContainer.NAME, debeziumMySqlContainer); + } + } + }