From 4c6d9f1eb0e73bd96e56db8605b78ce05ee0b3db Mon Sep 17 00:00:00 2001 From: Niraj Patel Date: Fri, 30 Sep 2016 00:51:55 -0500 Subject: [PATCH] upgrading kafka, breaking out configuration --- README.md | 2 +- pom.xml | 19 ++- .../kafka/connect/mongodb/DatabaseReader.java | 26 +++- ...{MongodbReader.java => MongoDBReader.java} | 21 ++- ...nnector.java => MongoDBSinkConnector.java} | 83 +++++------- ...godbSinkTask.java => MongoDBSinkTask.java} | 80 ++++++----- .../mongodb/MongoDBSourceConnector.java | 104 +++++++++++++++ ...SourceTask.java => MongoDBSourceTask.java} | 69 +++++----- .../mongodb/MongodbSourceConnector.java | 124 ------------------ .../MongoDBSinkConfiguration.java | 65 +++++++++ .../MongoDBSourceConfiguration.java | 69 ++++++++++ .../apache/kafka/connect/utils/LogUtils.java | 4 +- .../mongodb/MongoDBSourceConnectorTest.java | 80 +++++++++++ ...skTest.java => MongoDBSourceTaskTest.java} | 19 +-- .../mongodb/MongodbSourceConnectorTest.java | 82 ------------ 15 files changed, 488 insertions(+), 359 deletions(-) rename src/main/java/org/apache/kafka/connect/mongodb/{MongodbReader.java => MongoDBReader.java} (72%) rename src/main/java/org/apache/kafka/connect/mongodb/{MongodbSinkConnector.java => MongoDBSinkConnector.java} (52%) rename src/main/java/org/apache/kafka/connect/mongodb/{MongodbSinkTask.java => MongoDBSinkTask.java} (63%) create mode 100644 src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnector.java rename src/main/java/org/apache/kafka/connect/mongodb/{MongodbSourceTask.java => MongoDBSourceTask.java} (76%) delete mode 100644 src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java create mode 100644 src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSinkConfiguration.java create mode 100644 src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSourceConfiguration.java create mode 100644 src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnectorTest.java rename src/test/java/org/apache/kafka/connect/mongodb/{MongodbSourceTaskTest.java => MongoDBSourceTaskTest.java} (91%) delete mode 100644 src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceConnectorTest.java diff --git a/README.md b/README.md index eaeda52..c1cc774 100644 --- a/README.md +++ b/README.md @@ -68,7 +68,7 @@ For every message, a SourceRecord is created, having the following schema: * **database**: database in which the operation took place * **object**: inserted/updated/deleted object -## Sample Configuration +## Source Configuration ```ini name=mongodb-source-connector connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector diff --git a/pom.xml b/pom.xml index ce13e32..c2ca6c2 100644 --- a/pom.xml +++ b/pom.xml @@ -8,6 +8,11 @@ connect-mongodb 1.0 + + 0.10.0.0 + 3.2.1 + + @@ -16,8 +21,8 @@ 3.3 true - 1.7 - 1.7 + 1.8 + 1.8 @@ -46,12 +51,18 @@ org.mongodb mongodb-driver - 3.2.1 + ${mongodb.version} org.apache.kafka connect-api - 0.9.0.0 + ${kafka.version} + provided + + + org.apache.kafka + kafka-clients + ${kafka.version} provided diff --git a/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java b/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java index ad4176c..9a24beb 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java @@ -2,6 +2,7 @@ import com.mongodb.CursorType; import com.mongodb.MongoClient; +import com.mongodb.ServerAddress; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -14,7 +15,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Arrays; +import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.stream.Collectors; /** * Reads mutation from a mongodb database @@ -23,8 +27,7 @@ */ public class DatabaseReader implements Runnable { Logger log = LoggerFactory.getLogger(DatabaseReader.class); - private String host; - private Integer port; + private String hosts; private String db; private String start; @@ -33,9 +36,8 @@ public class DatabaseReader implements Runnable { private MongoCollection oplog; private Bson query; - public DatabaseReader(String host, Integer port, String db, String start, ConcurrentLinkedQueue messages) { - this.host = host; - this.port = port; + public DatabaseReader(String hosts, String db, String start, ConcurrentLinkedQueue messages) { + this.hosts = hosts; this.db = db; this.start = start; this.messages = messages; @@ -81,7 +83,19 @@ private void init() { * @return the oplog collection */ private MongoCollection readCollection() { - MongoClient mongoClient = new MongoClient(host, port); + List addresses = Arrays.stream(hosts.split(",")).map(hostUrl -> { + try { + String[] hostAndPort = hostUrl.split(":"); + String host = hostAndPort[0]; + int port = Integer.parseInt(hostAndPort[1]); + return new ServerAddress(host, port); + } catch (ArrayIndexOutOfBoundsException aioobe) { + throw new ConnectException("hosts must be in host:port format"); + } catch (NumberFormatException nfe) { + throw new ConnectException("port in the hosts field must be an integer"); + } + }).collect(Collectors.toList()); + MongoClient mongoClient = new MongoClient(addresses); MongoDatabase db = mongoClient.getDatabase("local"); return db.getCollection("oplog.rs"); } diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java similarity index 72% rename from src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java rename to src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java index 46d728c..435474d 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbReader.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBReader.java @@ -15,19 +15,17 @@ * * @author Andrea Patelli */ -public class MongodbReader { - private static final Logger log = LoggerFactory.getLogger(MongodbReader.class); +public class MongoDBReader { + private static final Logger log = LoggerFactory.getLogger(MongoDBReader.class); protected ConcurrentLinkedQueue messages; private List dbs; - private String host; - private Integer port; + private String hosts; private Map, Map> start; - public MongodbReader(String host, Integer port, List dbs, Map, Map> start) { - this.host = host; - this.port = port; + public MongoDBReader(String hosts, List dbs, Map, Map> start) { + this.hosts = hosts; this.dbs = new ArrayList<>(0); this.dbs.addAll(dbs); this.start = start; @@ -36,7 +34,7 @@ public MongodbReader(String host, Integer port, List dbs, Map { String start; // get the last message that was read Map dbOffset = this.start.get(Collections.singletonMap("mongodb", db)); @@ -46,13 +44,12 @@ public void run() { start = (String) this.start.get(Collections.singletonMap("mongodb", db)).get(db); log.trace("Starting database reader with configuration: "); - log.trace("host: {}", host); - log.trace("port: {}", port); + log.trace("hosts: {}", hosts); log.trace("db: {}", db); log.trace("start: {}", start); // start a new thread for reading mutation of the specific database - DatabaseReader reader = new DatabaseReader(host, port, db, start, messages); + DatabaseReader reader = new DatabaseReader(hosts, db, start, messages); new Thread(reader).start(); - } + }); } } diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSinkConnector.java b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSinkConnector.java similarity index 52% rename from src/main/java/org/apache/kafka/connect/mongodb/MongodbSinkConnector.java rename to src/main/java/org/apache/kafka/connect/mongodb/MongoDBSinkConnector.java index bda556b..6fdd981 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSinkConnector.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSinkConnector.java @@ -1,8 +1,10 @@ package org.apache.kafka.connect.mongodb; +import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.utils.AppInfoParser; import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.mongodb.configuration.MongoDBSinkConfiguration; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.util.ConnectorUtils; import org.apache.kafka.connect.utils.LogUtils; @@ -10,30 +12,25 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; /** - * MongodbSinkConnector implement the Connector interface to send Kafka + * MongoDBSinkConnector implement the Connector interface to send Kafka * data to Mongodb * * @author Andrea Patelli + * @author Niraj Patel */ -public class MongodbSinkConnector extends SinkConnector { - private final static Logger log = LoggerFactory.getLogger(MongodbSinkConnector.class); +public class MongoDBSinkConnector extends SinkConnector { - public static final String PORT = "port"; - public static final String HOST = "host"; - public static final String BULK_SIZE = "bulk.size"; - public static final String DATABASE = "mongodb.database"; - public static final String COLLECTIONS = "mongodb.collections"; - public static final String TOPICS = "topics"; + private final static Logger log = LoggerFactory.getLogger(MongoDBSinkConnector.class); - private String port; - private String host; - private String bulkSize; - private String database; - private String collections; - private String topics; + private Map configuration; /** * Get the version of this connector. @@ -49,33 +46,20 @@ public String version() { * Start this Connector. This method will only be called on a clean Connector, i.e. it has * either just been instantiated and initialized or {@link #stop()} has been invoked. * - * @param map configuration settings + * @param configuration configuration settings */ @Override - public void start(Map map) { - log.trace("Parsing configuration"); - - port = map.get(PORT); - if (port == null || port.isEmpty()) - throw new ConnectException("Missing " + PORT + " config"); - - bulkSize = map.get(BULK_SIZE); - if (bulkSize == null || bulkSize.isEmpty()) - throw new ConnectException("Missing " + BULK_SIZE + " config"); - - host = map.get(HOST); - if (host == null || host.isEmpty()) - throw new ConnectException("Missing " + HOST + " config"); - - database = map.get(DATABASE); - collections = map.get(COLLECTIONS); - topics = map.get(TOPICS); + public void start(Map configuration) { + MongoDBSinkConfiguration sinkConfiguration = new MongoDBSinkConfiguration(configuration); + this.configuration = sinkConfiguration.originalsStrings(); + String collections = configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG); + String topics = configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG); if (collections.split(",").length != topics.split(",").length) { throw new ConnectException("The number of topics should be the same as the number of collections"); } - LogUtils.dumpConfiguration(map, log); + LogUtils.dumpConfiguration(configuration, log); } /** @@ -85,7 +69,7 @@ public void start(Map map) { */ @Override public Class taskClass() { - return MongodbSinkTask.class; + return MongoDBSinkTask.class; } /** @@ -98,22 +82,21 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { List> configs = new ArrayList<>(); - List coll = Arrays.asList(collections.split(",")); + List coll = Arrays.asList(configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG).split(",")); int numGroups = Math.min(coll.size(), maxTasks); List> dbsGrouped = ConnectorUtils.groupPartitions(coll, numGroups); - List topics = Arrays.asList(this.topics.split(",")); + List topics = Arrays.asList(configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG).split(",")); List> topicsGrouped = ConnectorUtils.groupPartitions(topics, numGroups); - for (int i = 0; i < numGroups; i++) { + IntStream.range(0, numGroups).forEach(i -> { Map config = new HashMap<>(); - config.put(PORT, port); - config.put(BULK_SIZE, bulkSize); - config.put(HOST, host); - config.put(DATABASE, database); - config.put(COLLECTIONS, StringUtils.join(dbsGrouped.get(i), ",")); - config.put(TOPICS, StringUtils.join(topicsGrouped.get(i), ",")); + config.put(MongoDBSinkConfiguration.BULK_SIZE_CONFIG, configuration.get(MongoDBSinkConfiguration.BULK_SIZE_CONFIG)); + config.put(MongoDBSinkConfiguration.HOST_URLS_CONFIG, configuration.get(MongoDBSinkConfiguration.HOST_URLS_CONFIG)); + config.put(MongoDBSinkConfiguration.DATABASE_CONFIG, configuration.get(MongoDBSinkConfiguration.DATABASE_CONFIG)); + config.put(MongoDBSinkConfiguration.COLLECTIONS_CONFIG, StringUtils.join(dbsGrouped.get(i), ",")); + config.put(MongoDBSinkConfiguration.TOPICS_CONFIG, StringUtils.join(topicsGrouped.get(i), ",")); configs.add(config); - } + }); return configs; } @@ -122,6 +105,12 @@ public List> taskConfigs(int maxTasks) { */ @Override public void stop() { + // not implemented + } + @Override + public ConfigDef config() { + return MongoDBSinkConfiguration.config; } + } diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSinkTask.java b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSinkTask.java similarity index 63% rename from src/main/java/org/apache/kafka/connect/mongodb/MongodbSinkTask.java rename to src/main/java/org/apache/kafka/connect/mongodb/MongoDBSinkTask.java index 59a12a7..990e1bb 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSinkTask.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSinkTask.java @@ -1,6 +1,7 @@ package org.apache.kafka.connect.mongodb; import com.mongodb.MongoClient; +import com.mongodb.ServerAddress; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.Filters; @@ -11,6 +12,7 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.mongodb.configuration.MongoDBSinkConfiguration; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.utils.SchemaUtils; @@ -18,60 +20,69 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; /** - * MongodbSinkTask is a Task that takes records loaded from Kafka and sends them to + * MongoDBSinkTask is a Task that takes records loaded from Kafka and sends them to * mongodb. * * @author Andrea Patelli */ -public class MongodbSinkTask extends SinkTask { - private final static Logger log = LoggerFactory.getLogger(MongodbSinkTask.class); +public class MongoDBSinkTask extends SinkTask { + private final static Logger log = LoggerFactory.getLogger(MongoDBSinkTask.class); - private Integer port; - private String host; - private Integer bulkSize; + private Map mapping; + + private MongoDatabase db; + + private int bulkSize; private String collections; private String database; + private String hostUrls; private String topics; - private Map mapping; - private MongoDatabase db; - @Override public String version() { - return new MongodbSinkConnector().version(); + return new MongoDBSinkConnector().version(); } /** * Start the Task. Handles configuration parsing and one-time setup of the task. * - * @param map initial configuration + * @param configuration initial configuration */ @Override - public void start(Map map) { - try { - port = Integer.parseInt(map.get(MongodbSinkConnector.PORT)); - } catch (Exception e) { - throw new ConnectException("Setting " + MongodbSinkConnector.PORT + " should be an integer"); - } - - try { - bulkSize = Integer.parseInt(map.get(MongodbSinkConnector.BULK_SIZE)); - } catch (Exception e) { - throw new ConnectException("Setting " + MongodbSinkConnector.BULK_SIZE + " should be an integer"); - } - - database = map.get(MongodbSinkConnector.DATABASE); - host = map.get(MongodbSinkConnector.HOST); - collections = map.get(MongodbSinkConnector.COLLECTIONS); - topics = map.get(MongodbSinkConnector.TOPICS); + public void start(Map configuration) { + this.bulkSize = Integer.parseInt(configuration.get(MongoDBSinkConfiguration.BULK_SIZE_CONFIG)); + this.collections = configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG); + this.database = configuration.get(MongoDBSinkConfiguration.DATABASE_CONFIG); + this.hostUrls = configuration.get(MongoDBSinkConfiguration.HOST_URLS_CONFIG); + this.topics = configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG); List collectionsList = Arrays.asList(collections.split(",")); List topicsList = Arrays.asList(topics.split(",")); - MongoClient mongoClient = new MongoClient(host, port); + List addresses = Arrays.stream(hostUrls.split(",")).map(hostUrl -> { + try { + String[] hostAndPort = hostUrl.split(":"); + String host = hostAndPort[0]; + int port = Integer.parseInt(hostAndPort[1]); + return new ServerAddress(host, port); + } catch (ArrayIndexOutOfBoundsException aioobe) { + throw new ConnectException("hosts must be in host:port format"); + } catch (NumberFormatException nfe) { + throw new ConnectException("port in the hosts field must be an integer"); + } + }).collect(Collectors.toList()); + + MongoClient mongoClient = new MongoClient(addresses); db = mongoClient.getDatabase(database); mapping = new HashMap<>(); @@ -91,7 +102,8 @@ public void start(Map map) { @Override public void put(Collection collection) { List records = new ArrayList<>(collection); - for (int i = 0; i < records.size(); i++) { + + IntStream.range(0, records.size()).forEach(i -> { Map>> bulks = new HashMap<>(); for (int j = 0; j < bulkSize && i < records.size(); j++, i++) { @@ -114,14 +126,14 @@ public void put(Collection collection) { } i--; log.trace("Executing bulk"); - for (String key : bulks.keySet()) { + bulks.keySet().forEach(key -> { try { com.mongodb.bulk.BulkWriteResult result = mapping.get(key).bulkWrite(bulks.get(key)); } catch (Exception e) { log.error(e.getMessage()); } - } - } + }); + }); } @Override diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnector.java b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnector.java new file mode 100644 index 0000000..fb61200 --- /dev/null +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnector.java @@ -0,0 +1,104 @@ +package org.apache.kafka.connect.mongodb; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.utils.AppInfoParser; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.mongodb.configuration.MongoDBSourceConfiguration; +import org.apache.kafka.connect.source.SourceConnector; +import org.apache.kafka.connect.util.ConnectorUtils; +import org.apache.kafka.connect.utils.LogUtils; +import org.apache.kafka.connect.utils.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.IntStream; + +/** + * MongoDBSourceConnector implements the connector interface + * to write on Kafka mutations received from a mongodb database + * + * @author Andrea Patelli + */ +public class MongoDBSourceConnector extends SourceConnector { + + private final static Logger log = LoggerFactory.getLogger(MongoDBSourceConnector.class); + + private Map configuration; + + /** + * Get the version of this connector. + * + * @return the version, formatted as a String + */ + @Override + public String version() { + return AppInfoParser.getVersion(); + } + + /** + * Start this Connector. This method will only be called on a clean Connector, i.e. it has + * either just been instantiated and initialized or {@link #stop()} has been invoked. + * + * @param configuration configuration settings + */ + @Override + public void start(Map configuration) { + MongoDBSourceConfiguration sourceConfiguration = new MongoDBSourceConfiguration(configuration); + this.configuration = sourceConfiguration.originalsStrings(); + + LogUtils.dumpConfiguration(configuration, log); + } + + /** + * Returns the Task implementation for this Connector. + * + * @return the Task implementation Class + */ + @Override + public Class taskClass() { + return MongoDBSourceTask.class; + } + + /** + * Returns a set of configuration for the Task based on the current configuration. + * + * @param maxTasks maximum number of configurations to generate + * @return configurations for the Task + */ + @Override + public List> taskConfigs(int maxTasks) { + ArrayList> configs = new ArrayList<>(); + List dbs = Arrays.asList(configuration.get(MongoDBSourceConfiguration.DATABASES_CONFIG).split(",")); + int numGroups = Math.min(dbs.size(), maxTasks); + List> dbsGrouped = ConnectorUtils.groupPartitions(dbs, numGroups); + + IntStream.range(0, numGroups).forEach(i -> { + Map config = new HashMap<>(); + config.put(MongoDBSourceConfiguration.HOST_URLS_CONFIG, configuration.get(MongoDBSourceConfiguration.HOST_URLS_CONFIG)); + config.put(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG, configuration.get(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG)); + config.put(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG, configuration.get(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG)); + config.put(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG, configuration.get(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG)); + config.put(MongoDBSourceConfiguration.DATABASES_CONFIG, StringUtils.join(dbsGrouped.get(i), ",")); + configs.add(config); + }); + return configs; + } + + /** + * Stop this connector. + */ + @Override + public void stop() { + } + + @Override + public ConfigDef config() { + return MongoDBSourceConfiguration.config; + } + +} diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceTask.java similarity index 76% rename from src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java rename to src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceTask.java index 6c940bf..4339dc8 100644 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceTask.java +++ b/src/main/java/org/apache/kafka/connect/mongodb/MongoDBSourceTask.java @@ -4,7 +4,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.mongodb.configuration.MongoDBSourceConfiguration; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.bson.BsonTimestamp; @@ -12,65 +12,58 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; /** - * MongodbSourceTask is a Task that reads mutations from a mongodb for storage in Kafka. + * MongoDBSourceTask is a Task that reads mutations from a mongodb for storage in Kafka. * * @author Andrea Patelli */ -public class MongodbSourceTask extends SourceTask { - private final static Logger log = LoggerFactory.getLogger(MongodbSourceTask.class); +public class MongoDBSourceTask extends SourceTask { + private final static Logger log = LoggerFactory.getLogger(MongoDBSourceTask.class); - private Integer port; - private String host; - private String schemaName; - private Integer batchSize; - private String topicPrefix; - private List databases; private static Map schemas = null; - private MongodbReader reader; + private MongoDBReader reader; + private int batchSize; + private List databases; + private String hosts; + private String schemaName; + private String topicPrefix; Map, Map> offsets = new HashMap<>(0); @Override public String version() { - return new MongodbSourceConnector().version(); + return new MongoDBSourceConnector().version(); } /** * Start the Task. Handles configuration parsing and one-time setup of the Task. * - * @param map initial configuration + * @param configuration initial configuration */ @Override - public void start(Map map) { - try { - port = Integer.parseInt(map.get(MongodbSourceConnector.PORT)); - } catch (Exception e) { - throw new ConnectException(MongodbSourceConnector.PORT + " config should be an Integer"); - } - - try { - batchSize = Integer.parseInt(map.get(MongodbSourceConnector.BATCH_SIZE)); - } catch (Exception e) { - throw new ConnectException(MongodbSourceConnector.BATCH_SIZE + " config should be an Integer"); - } - - schemaName = map.get(MongodbSourceConnector.SCHEMA_NAME); - topicPrefix = map.get(MongodbSourceConnector.TOPIC_PREFIX); - host = map.get(MongodbSourceConnector.HOST); - databases = Arrays.asList(map.get(MongodbSourceConnector.DATABASES).split(",")); + public void start(Map configuration) { + this.batchSize = Integer.parseInt(configuration.get(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG)); + this.databases = Arrays.asList(configuration.get(MongoDBSourceConfiguration.DATABASES_CONFIG).split(",")); + this.hosts = configuration.get(MongoDBSourceConfiguration.HOST_URLS_CONFIG); + this.schemaName = configuration.get(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG); + this.topicPrefix = configuration.get(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG); log.trace("Creating schema"); if (schemas == null) { schemas = new HashMap<>(); } - for (String db : databases) { + databases.stream().forEach(db -> { db = db.replaceAll("[\\s.]", "_"); if (schemas.get(db) == null) schemas.put(db, @@ -83,15 +76,15 @@ public void start(Map map) { .field("database", Schema.OPTIONAL_STRING_SCHEMA) .field("object", Schema.OPTIONAL_STRING_SCHEMA) .build()); - } + }); loadOffsets(); - reader = new MongodbReader(host, port, databases, offsets); + reader = new MongoDBReader(hosts, databases, offsets); reader.run(); } /** - * Poll this MongodbSourceTask for new records. + * Poll this MongoDBSourceTask for new records. * * @return a list of source records * @throws InterruptException @@ -187,10 +180,10 @@ private Struct getStruct(Document message) { */ private void loadOffsets() { List> partitions = new ArrayList<>(); - for (String db : databases) { - Map partition = Collections.singletonMap("mongodb", db); + databases.stream().forEach(database -> { + Map partition = Collections.singletonMap("mongodb", database); partitions.add(partition); - } + }); offsets.putAll(context.offsetStorageReader().offsets(partitions)); } } diff --git a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java b/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java deleted file mode 100644 index c222457..0000000 --- a/src/main/java/org/apache/kafka/connect/mongodb/MongodbSourceConnector.java +++ /dev/null @@ -1,124 +0,0 @@ -package org.apache.kafka.connect.mongodb; - -import org.apache.kafka.common.utils.AppInfoParser; -import org.apache.kafka.connect.connector.Task; -import org.apache.kafka.connect.errors.ConnectException; -import org.apache.kafka.connect.source.SourceConnector; -import org.apache.kafka.connect.util.ConnectorUtils; -import org.apache.kafka.connect.utils.LogUtils; -import org.apache.kafka.connect.utils.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.*; - -/** - * MongodbSourceConnector implements the connector interface - * to write on Kafka mutations received from a mongodb database - * - * @author Andrea Patelli - */ -public class MongodbSourceConnector extends SourceConnector { - private final static Logger log = LoggerFactory.getLogger(MongodbSourceConnector.class); - - public static final String PORT = "port"; - public static final String HOST = "host"; - public static final String SCHEMA_NAME = "schema.name"; - public static final String BATCH_SIZE = "batch.size"; - public static final String TOPIC_PREFIX = "topic.prefix"; - public static final String DATABASES = "databases"; - - private String port; - private String host; - private String schemaName; - private String batchSize; - private String topicPrefix; - private String databases; - - /** - * Get the version of this connector. - * - * @return the version, formatted as a String - */ - @Override - public String version() { - return AppInfoParser.getVersion(); - } - - /** - * Start this Connector. This method will only be called on a clean Connector, i.e. it has - * either just been instantiated and initialized or {@link #stop()} has been invoked. - * - * @param map configuration settings - */ - @Override - public void start(Map map) { - log.trace("Parsing configuration"); - - port = map.get(PORT); - if (port == null || port.isEmpty()) - throw new ConnectException("Missing " + PORT + " config"); - - schemaName = map.get(SCHEMA_NAME); - if (schemaName == null || schemaName.isEmpty()) - throw new ConnectException("Missing " + SCHEMA_NAME + " config"); - - batchSize = map.get(BATCH_SIZE); - if (batchSize == null || batchSize.isEmpty()) - throw new ConnectException("Missing " + BATCH_SIZE + " config"); - - host = map.get(HOST); - if (host == null || host.isEmpty()) - throw new ConnectException("Missing " + HOST + " config"); - - databases = map.get(DATABASES); - - topicPrefix = map.get(TOPIC_PREFIX); - - LogUtils.dumpConfiguration(map, log); - } - - /** - * Returns the Task implementation for this Connector. - * - * @return the Task implementation Class - */ - @Override - public Class taskClass() { - return MongodbSourceTask.class; - } - - /** - * Returns a set of configuration for the Task based on the current configuration. - * - * @param maxTasks maximum number of configurations to generate - * @return configurations for the Task - */ - @Override - public List> taskConfigs(int maxTasks) { - ArrayList> configs = new ArrayList<>(); - List dbs = Arrays.asList(databases.split(",")); - int numGroups = Math.min(dbs.size(), maxTasks); - List> dbsGrouped = ConnectorUtils.groupPartitions(dbs, numGroups); - for (int i = 0; i < numGroups; i++) { - Map config = new HashMap<>(); - config.put(PORT, port); - config.put(HOST, host); - config.put(SCHEMA_NAME, schemaName); - config.put(BATCH_SIZE, batchSize); - config.put(TOPIC_PREFIX, topicPrefix); - config.put(DATABASES, StringUtils.join(dbsGrouped.get(i), ",")); - configs.add(config); - } - return configs; - } - - /** - * Stop this connector. - */ - @Override - public void stop() { - } - - -} diff --git a/src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSinkConfiguration.java b/src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSinkConfiguration.java new file mode 100644 index 0000000..6ccf2f3 --- /dev/null +++ b/src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSinkConfiguration.java @@ -0,0 +1,65 @@ +/* + * Copyright HomeAway, Inc 2016-Present. All Rights Reserved. + * No unauthorized use of this software. + */ + +package org.apache.kafka.connect.mongodb.configuration; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.connect.mongodb.MongoDBSinkConnector; + +import java.util.Map; + +/** + * Configuration for {@link MongoDBSinkConnector}. + */ +public class MongoDBSinkConfiguration extends AbstractConfig { + + private static final String MONGODB_GROUP = "MongoDB"; + private static final String CONNECTOR_GROUP = "Connector"; + + /** + * MongoDB Settings + */ + public static final String TOPICS_CONFIG = "topics"; + + public static final String HOST_URLS_CONFIG = "mongodb.host.urls"; + private static final String HOST_URLS_DOC = "The host URLs to use to connect to MongoDB."; + private static final String HOST_URLS_DEFAULT = "http://127.0.0.1:27017"; + private static final String HOST_URLS_DISPLAY = "Host URLs (host:port comma seperated)"; + + public static final String DATABASE_CONFIG = "mongodb.database"; + private static final String DATABASE_DOC = "The database to write to in MongoDB."; + private static final String DATABASE_DISPLAY = "Database"; + + public static final String COLLECTIONS_CONFIG = "mongodb.database.collections"; + private static final String COLLECTIONS_DOC = "The collection to write to in MongoDB database."; + private static final String COLLECTIONS_DISPLAY = "Collections (comma seperated)"; + + /** + * Connector settings + */ + public static final String BULK_SIZE_CONFIG = "connector.bulk.size"; + private static final String BULK_SIZE_DOC = "The bulk size to use when writing to MongoDB."; + private static final int BULK_SIZE_DEFAULT = 100; + private static final String BULK_SIZE_DISPLAY = "Connector bulk size"; + + public MongoDBSinkConfiguration(Map props) { + super(config, props); + } + + public static ConfigDef config = baseConfigDef(); + + public static ConfigDef baseConfigDef() { + return new ConfigDef() + .define(HOST_URLS_CONFIG, Type.STRING, HOST_URLS_DEFAULT, Importance.HIGH, HOST_URLS_DOC, MONGODB_GROUP, 1, Width.LONG, HOST_URLS_DISPLAY) + .define(DATABASE_CONFIG, Type.STRING, Importance.HIGH, DATABASE_DOC, MONGODB_GROUP, 2, Width.LONG, DATABASE_DISPLAY) + .define(COLLECTIONS_CONFIG, Type.STRING, Importance.HIGH, COLLECTIONS_DOC, MONGODB_GROUP, 3, Width.LONG, COLLECTIONS_DISPLAY) + .define(BULK_SIZE_CONFIG, Type.INT, BULK_SIZE_DEFAULT, Importance.MEDIUM, BULK_SIZE_DOC, CONNECTOR_GROUP, 4, Width.LONG, BULK_SIZE_DISPLAY); + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSourceConfiguration.java b/src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSourceConfiguration.java new file mode 100644 index 0000000..587b76f --- /dev/null +++ b/src/main/java/org/apache/kafka/connect/mongodb/configuration/MongoDBSourceConfiguration.java @@ -0,0 +1,69 @@ +/* + * Copyright HomeAway, Inc 2016-Present. All Rights Reserved. + * No unauthorized use of this software. + */ + +package org.apache.kafka.connect.mongodb.configuration; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.common.config.ConfigDef.Width; +import org.apache.kafka.connect.mongodb.MongoDBSourceConnector; + +import java.util.Map; + +/** + * Configuration for {@link MongoDBSourceConnector}. + */ +public class MongoDBSourceConfiguration extends AbstractConfig { + + private static final String MONGODB_GROUP = "MongoDB"; + private static final String CONNECTOR_GROUP = "Connector"; + + /** + * MongoDB Settings + */ + public static final String HOST_URLS_CONFIG = "mongodb.host.urls"; + private static final String HOST_URLS_DOC = "The host URLs to use to connect to MongoDB."; + private static final String HOST_URLS_DEFAULT = "http://127.0.0.1:27017"; + private static final String HOST_URLS_DISPLAY = "Host URLs (host:port comma seperated)"; + + public static final String DATABASES_CONFIG = "mongodb.databases"; + private static final String DATABASES_DOC = "The databases to read from in MongoDB."; + private static final String DATABASES_DISPLAY = "Database"; + + /** + * Connector Settings + */ + public static final String BATCH_SIZE_CONFIG = "connector.batch.size"; + private static final String BATCH_SIZE_DOC = "The batch size to use when reading to MongoDB."; + private static final int BATCH_SIZE_DEFAULT = 100; + private static final String BATCH_SIZE_DISPLAY = "Connector batch size"; + + public static final String SCHEMA_NAME_CONFIG = "connector.schema.name"; + private static final String SCHEMA_NAME_DOC = "Name to use for the schema."; + private static final String SCHEMA_NAME_DISPLAY = "Schema name"; + + public static final String TOPIC_PREFIX_CONFIG = "connector.topic.prefix"; + private static final String TOPIC_PREFIX_DOC = "Prefix to use for topic."; + private static final String TOPIC_PREFIX_DEFAULT = ""; + private static final String TOPIC_PREFIX_DISPLAY = "Topic prefix"; + + public MongoDBSourceConfiguration(Map props) { + super(config, props); + } + + public static ConfigDef config = baseConfigDef(); + + public static ConfigDef baseConfigDef() { + return new ConfigDef() + .define(HOST_URLS_CONFIG, Type.STRING, HOST_URLS_DEFAULT, Importance.HIGH, HOST_URLS_DOC, MONGODB_GROUP, 1, Width.LONG, HOST_URLS_DISPLAY) + .define(DATABASES_CONFIG, Type.STRING, Importance.HIGH, DATABASES_DOC, MONGODB_GROUP, 2, Width.LONG, DATABASES_DISPLAY) + .define(SCHEMA_NAME_CONFIG, Type.STRING, Importance.MEDIUM, SCHEMA_NAME_DOC, MONGODB_GROUP, 3, Width.LONG, SCHEMA_NAME_DISPLAY) + .define(BATCH_SIZE_CONFIG, Type.INT, BATCH_SIZE_DEFAULT, Importance.MEDIUM, BATCH_SIZE_DOC, CONNECTOR_GROUP, 4, Width.LONG, BATCH_SIZE_DISPLAY) + .define(TOPIC_PREFIX_CONFIG, Type.STRING, TOPIC_PREFIX_DEFAULT, Importance.LOW, TOPIC_PREFIX_DOC, CONNECTOR_GROUP, 5, Width.LONG, TOPIC_PREFIX_DISPLAY); + } + +} \ No newline at end of file diff --git a/src/main/java/org/apache/kafka/connect/utils/LogUtils.java b/src/main/java/org/apache/kafka/connect/utils/LogUtils.java index eaf746d..701ea94 100644 --- a/src/main/java/org/apache/kafka/connect/utils/LogUtils.java +++ b/src/main/java/org/apache/kafka/connect/utils/LogUtils.java @@ -10,8 +10,8 @@ public class LogUtils { public static void dumpConfiguration(Map map, Logger log) { log.trace("Starting connector with configuration:"); - for (Map.Entry entry : map.entrySet()) { + map.entrySet().forEach(entry -> { log.trace("{}: {}", entry.getKey(), entry.getValue()); - } + }); } } diff --git a/src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnectorTest.java b/src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnectorTest.java new file mode 100644 index 0000000..fe57dcb --- /dev/null +++ b/src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceConnectorTest.java @@ -0,0 +1,80 @@ +package org.apache.kafka.connect.mongodb; + +import org.apache.kafka.connect.connector.ConnectorContext; +import org.apache.kafka.connect.mongodb.configuration.MongoDBSourceConfiguration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.powermock.api.easymock.PowerMock; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * @author Andrea Patelli + */ +public class MongoDBSourceConnectorTest { + + private MongoDBSourceConnector connector; + + private ConnectorContext context; + + private Map sourceProperties; + + @Before + public void setup() { + connector = new MongoDBSourceConnector(); + context = PowerMock.createMock(ConnectorContext.class); + connector.initialize(context); + + sourceProperties = new HashMap<>(); + sourceProperties.put(MongoDBSourceConfiguration.HOST_URLS_CONFIG, "localhost:12345"); + sourceProperties.put(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG, Integer.toString(100)); + sourceProperties.put(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG, "schema"); + sourceProperties.put(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG, "prefix"); + sourceProperties.put(MongoDBSourceConfiguration.DATABASES_CONFIG, "mydb.test1,mydb.test2,mydb.test3"); + } + + @Test + public void testSourceTasks() { + PowerMock.replayAll(); + connector.start(sourceProperties); + List> taskConfigs = connector.taskConfigs(1); + Assert.assertEquals(1, taskConfigs.size()); + Assert.assertEquals("localhost:12345", taskConfigs.get(0).get(MongoDBSourceConfiguration.HOST_URLS_CONFIG)); + Assert.assertEquals("100", taskConfigs.get(0).get(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG)); + Assert.assertEquals("schema", taskConfigs.get(0).get(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG)); + Assert.assertEquals("prefix", taskConfigs.get(0).get(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG)); + Assert.assertEquals("mydb.test1,mydb.test2,mydb.test3", taskConfigs.get(0).get(MongoDBSourceConfiguration.DATABASES_CONFIG)); + PowerMock.verifyAll(); + } + + @Test + public void testMultipleTasks() { + PowerMock.replayAll(); + connector.start(sourceProperties); + List> taskConfigs = connector.taskConfigs(2); + Assert.assertEquals(2, taskConfigs.size()); + Assert.assertEquals("localhost:12345", taskConfigs.get(0).get(MongoDBSourceConfiguration.HOST_URLS_CONFIG)); + Assert.assertEquals("100", taskConfigs.get(0).get(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG)); + Assert.assertEquals("schema", taskConfigs.get(0).get(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG)); + Assert.assertEquals("prefix", taskConfigs.get(0).get(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG)); + Assert.assertEquals("mydb.test1,mydb.test2", taskConfigs.get(0).get(MongoDBSourceConfiguration.DATABASES_CONFIG)); + + Assert.assertEquals("localhost:12345", taskConfigs.get(1).get(MongoDBSourceConfiguration.HOST_URLS_CONFIG)); + Assert.assertEquals("100", taskConfigs.get(1).get(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG)); + Assert.assertEquals("schema", taskConfigs.get(1).get(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG)); + Assert.assertEquals("prefix", taskConfigs.get(1).get(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG)); + Assert.assertEquals("mydb.test3", taskConfigs.get(1).get(MongoDBSourceConfiguration.DATABASES_CONFIG)); + PowerMock.verifyAll(); + } + + @Test + public void testTaskClass() { + PowerMock.replayAll(); + connector.start(sourceProperties); + Assert.assertEquals(MongoDBSourceTask.class, connector.taskClass()); + PowerMock.verifyAll(); + } +} diff --git a/src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceTaskTest.java b/src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceTaskTest.java similarity index 91% rename from src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceTaskTest.java rename to src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceTaskTest.java index 8902c03..8f3336c 100644 --- a/src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceTaskTest.java +++ b/src/test/java/org/apache/kafka/connect/mongodb/MongoDBSourceTaskTest.java @@ -14,6 +14,7 @@ import junit.framework.TestCase; import org.apache.commons.io.FileUtils; import org.apache.commons.lang3.RandomStringUtils; +import org.apache.kafka.connect.mongodb.configuration.MongoDBSourceConfiguration; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTaskContext; import org.apache.kafka.connect.storage.OffsetStorageReader; @@ -28,11 +29,12 @@ /** * @author Andrea Patelli + * @author Niraj Patel */ -public class MongodbSourceTaskTest extends TestCase { +public class MongoDBSourceTaskTest extends TestCase { private static String REPLICATION_PATH = "/tmp/mongo"; - private MongodbSourceTask task; + private MongoDBSourceTask task; private SourceTaskContext context; private OffsetStorageReader offsetStorageReader; private Map sourceProperties; @@ -86,19 +88,18 @@ public void setUp() { // Assert.assertTrue(false); } - task = new MongodbSourceTask(); + task = new MongoDBSourceTask(); offsetStorageReader = PowerMock.createMock(OffsetStorageReader.class); context = PowerMock.createMock(SourceTaskContext.class); task.initialize(context); sourceProperties = new HashMap<>(); - sourceProperties.put("host", "localhost"); - sourceProperties.put("port", Integer.toString(12345)); - sourceProperties.put("batch.size", Integer.toString(100)); - sourceProperties.put("schema.name", "schema"); - sourceProperties.put("topic.prefix", "prefix"); - sourceProperties.put("databases", "mydb.test1,mydb.test2,mydb.test3"); + sourceProperties.put(MongoDBSourceConfiguration.HOST_URLS_CONFIG, "localhost:12345"); + sourceProperties.put(MongoDBSourceConfiguration.BATCH_SIZE_CONFIG, Integer.toString(100)); + sourceProperties.put(MongoDBSourceConfiguration.SCHEMA_NAME_CONFIG, "schema"); + sourceProperties.put(MongoDBSourceConfiguration.TOPIC_PREFIX_CONFIG, "prefix"); + sourceProperties.put(MongoDBSourceConfiguration.DATABASES_CONFIG, "mydb.test1,mydb.test2,mydb.test3"); } diff --git a/src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceConnectorTest.java b/src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceConnectorTest.java deleted file mode 100644 index 5a852c5..0000000 --- a/src/test/java/org/apache/kafka/connect/mongodb/MongodbSourceConnectorTest.java +++ /dev/null @@ -1,82 +0,0 @@ -package org.apache.kafka.connect.mongodb; - -import org.apache.kafka.connect.connector.ConnectorContext; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import org.powermock.api.easymock.PowerMock; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -/** - * @author Andrea Patelli - */ -public class MongodbSourceConnectorTest { - private MongodbSourceConnector connector; - private ConnectorContext context; - private Map sourceProperties; - - - @Before - public void setup() { - connector = new MongodbSourceConnector(); - context = PowerMock.createMock(ConnectorContext.class); - connector.initialize(context); - - sourceProperties = new HashMap<>(); - sourceProperties.put("host", "localhost"); - sourceProperties.put("port", Integer.toString(12345)); - sourceProperties.put("batch.size", Integer.toString(100)); - sourceProperties.put("schema.name", "schema"); - sourceProperties.put("topic.prefix", "prefix"); - sourceProperties.put("databases", "mydb.test1,mydb.test2,mydb.test3"); - - } - - @Test - public void testSourceTasks() { - PowerMock.replayAll(); - connector.start(sourceProperties); - List> taskConfigs = connector.taskConfigs(1); - Assert.assertEquals(1, taskConfigs.size()); - Assert.assertEquals("localhost", taskConfigs.get(0).get("host")); - Assert.assertEquals("12345", taskConfigs.get(0).get("port")); - Assert.assertEquals("100", taskConfigs.get(0).get("batch.size")); - Assert.assertEquals("schema", taskConfigs.get(0).get("schema.name")); - Assert.assertEquals("prefix", taskConfigs.get(0).get("topic.prefix")); - Assert.assertEquals("mydb.test1,mydb.test2,mydb.test3", taskConfigs.get(0).get("databases")); - PowerMock.verifyAll(); - } - - @Test - public void testMultipleTasks() { - PowerMock.replayAll(); - connector.start(sourceProperties); - List> taskConfigs = connector.taskConfigs(2); - Assert.assertEquals(2, taskConfigs.size()); - Assert.assertEquals("localhost", taskConfigs.get(0).get("host")); - Assert.assertEquals("12345", taskConfigs.get(0).get("port")); - Assert.assertEquals("100", taskConfigs.get(0).get("batch.size")); - Assert.assertEquals("schema", taskConfigs.get(0).get("schema.name")); - Assert.assertEquals("prefix", taskConfigs.get(0).get("topic.prefix")); - Assert.assertEquals("mydb.test1,mydb.test2", taskConfigs.get(0).get("databases")); - - Assert.assertEquals("localhost", taskConfigs.get(1).get("host")); - Assert.assertEquals("12345", taskConfigs.get(1).get("port")); - Assert.assertEquals("100", taskConfigs.get(1).get("batch.size")); - Assert.assertEquals("schema", taskConfigs.get(1).get("schema.name")); - Assert.assertEquals("prefix", taskConfigs.get(1).get("topic.prefix")); - Assert.assertEquals("mydb.test3", taskConfigs.get(1).get("databases")); - PowerMock.verifyAll(); - } - - @Test - public void testTaskClass() { - PowerMock.replayAll(); - connector.start(sourceProperties); - Assert.assertEquals(MongodbSourceTask.class, connector.taskClass()); - PowerMock.verifyAll(); - } -}