Skip to content

Commit

Permalink
upgrading kafka, breaking out configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
Niraj Patel committed Sep 30, 2016
1 parent f52fd03 commit 4c6d9f1
Show file tree
Hide file tree
Showing 15 changed files with 488 additions and 359 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ For every message, a SourceRecord is created, having the following schema:
* **database**: database in which the operation took place
* **object**: inserted/updated/deleted object

## Sample Configuration
## Source Configuration
```ini
name=mongodb-source-connector
connector.class=org.apache.kafka.connect.mongodb.MongodbSourceConnector
Expand Down
19 changes: 15 additions & 4 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
<artifactId>connect-mongodb</artifactId>
<version>1.0</version>

<properties>
<kafka.version>0.10.0.0</kafka.version>
<mongodb.version>3.2.1</mongodb.version>
</properties>

<build>
<plugins>
<plugin>
Expand All @@ -16,8 +21,8 @@
<version>3.3</version>
<inherited>true</inherited>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -46,12 +51,18 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.2.1</version>
<version>${mongodb.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>0.9.0.0</version>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down
26 changes: 20 additions & 6 deletions src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
Expand All @@ -14,7 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/**
* Reads mutation from a mongodb database
Expand All @@ -23,8 +27,7 @@
*/
public class DatabaseReader implements Runnable {
Logger log = LoggerFactory.getLogger(DatabaseReader.class);
private String host;
private Integer port;
private String hosts;
private String db;
private String start;

Expand All @@ -33,9 +36,8 @@ public class DatabaseReader implements Runnable {
private MongoCollection<Document> oplog;
private Bson query;

public DatabaseReader(String host, Integer port, String db, String start, ConcurrentLinkedQueue<Document> messages) {
this.host = host;
this.port = port;
public DatabaseReader(String hosts, String db, String start, ConcurrentLinkedQueue<Document> messages) {
this.hosts = hosts;
this.db = db;
this.start = start;
this.messages = messages;
Expand Down Expand Up @@ -81,7 +83,19 @@ private void init() {
* @return the oplog collection
*/
private MongoCollection readCollection() {
MongoClient mongoClient = new MongoClient(host, port);
List<ServerAddress> addresses = Arrays.stream(hosts.split(",")).map(hostUrl -> {
try {
String[] hostAndPort = hostUrl.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
return new ServerAddress(host, port);
} catch (ArrayIndexOutOfBoundsException aioobe) {
throw new ConnectException("hosts must be in host:port format");
} catch (NumberFormatException nfe) {
throw new ConnectException("port in the hosts field must be an integer");
}
}).collect(Collectors.toList());
MongoClient mongoClient = new MongoClient(addresses);
MongoDatabase db = mongoClient.getDatabase("local");
return db.getCollection("oplog.rs");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
*
* @author Andrea Patelli
*/
public class MongodbReader {
private static final Logger log = LoggerFactory.getLogger(MongodbReader.class);
public class MongoDBReader {
private static final Logger log = LoggerFactory.getLogger(MongoDBReader.class);

protected ConcurrentLinkedQueue<Document> messages;

private List<String> dbs;
private String host;
private Integer port;
private String hosts;
private Map<Map<String, String>, Map<String, Object>> start;

public MongodbReader(String host, Integer port, List<String> dbs, Map<Map<String, String>, Map<String, Object>> start) {
this.host = host;
this.port = port;
public MongoDBReader(String hosts, List<String> dbs, Map<Map<String, String>, Map<String, Object>> start) {
this.hosts = hosts;
this.dbs = new ArrayList<>(0);
this.dbs.addAll(dbs);
this.start = start;
Expand All @@ -36,7 +34,7 @@ public MongodbReader(String host, Integer port, List<String> dbs, Map<Map<String

public void run() {
// for every database to watch
for (String db : dbs) {
dbs.stream().forEach(db -> {
String start;
// get the last message that was read
Map<String, Object> dbOffset = this.start.get(Collections.singletonMap("mongodb", db));
Expand All @@ -46,13 +44,12 @@ public void run() {
start = (String) this.start.get(Collections.singletonMap("mongodb", db)).get(db);

log.trace("Starting database reader with configuration: ");
log.trace("host: {}", host);
log.trace("port: {}", port);
log.trace("hosts: {}", hosts);
log.trace("db: {}", db);
log.trace("start: {}", start);
// start a new thread for reading mutation of the specific database
DatabaseReader reader = new DatabaseReader(host, port, db, start, messages);
DatabaseReader reader = new DatabaseReader(hosts, db, start, messages);
new Thread(reader).start();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
package org.apache.kafka.connect.mongodb;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.mongodb.configuration.MongoDBSinkConfiguration;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.apache.kafka.connect.utils.LogUtils;
import org.apache.kafka.connect.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

/**
* MongodbSinkConnector implement the Connector interface to send Kafka
* MongoDBSinkConnector implement the Connector interface to send Kafka
* data to Mongodb
*
* @author Andrea Patelli
* @author Niraj Patel
*/
public class MongodbSinkConnector extends SinkConnector {
private final static Logger log = LoggerFactory.getLogger(MongodbSinkConnector.class);
public class MongoDBSinkConnector extends SinkConnector {

public static final String PORT = "port";
public static final String HOST = "host";
public static final String BULK_SIZE = "bulk.size";
public static final String DATABASE = "mongodb.database";
public static final String COLLECTIONS = "mongodb.collections";
public static final String TOPICS = "topics";
private final static Logger log = LoggerFactory.getLogger(MongoDBSinkConnector.class);

private String port;
private String host;
private String bulkSize;
private String database;
private String collections;
private String topics;
private Map<String, String> configuration;

/**
* Get the version of this connector.
Expand All @@ -49,33 +46,20 @@ public String version() {
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
* either just been instantiated and initialized or {@link #stop()} has been invoked.
*
* @param map configuration settings
* @param configuration configuration settings
*/
@Override
public void start(Map<String, String> map) {
log.trace("Parsing configuration");

port = map.get(PORT);
if (port == null || port.isEmpty())
throw new ConnectException("Missing " + PORT + " config");

bulkSize = map.get(BULK_SIZE);
if (bulkSize == null || bulkSize.isEmpty())
throw new ConnectException("Missing " + BULK_SIZE + " config");

host = map.get(HOST);
if (host == null || host.isEmpty())
throw new ConnectException("Missing " + HOST + " config");

database = map.get(DATABASE);
collections = map.get(COLLECTIONS);
topics = map.get(TOPICS);
public void start(Map<String, String> configuration) {
MongoDBSinkConfiguration sinkConfiguration = new MongoDBSinkConfiguration(configuration);
this.configuration = sinkConfiguration.originalsStrings();

String collections = configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG);
String topics = configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG);
if (collections.split(",").length != topics.split(",").length) {
throw new ConnectException("The number of topics should be the same as the number of collections");
}

LogUtils.dumpConfiguration(map, log);
LogUtils.dumpConfiguration(configuration, log);
}

/**
Expand All @@ -85,7 +69,7 @@ public void start(Map<String, String> map) {
*/
@Override
public Class<? extends Task> taskClass() {
return MongodbSinkTask.class;
return MongoDBSinkTask.class;
}

/**
Expand All @@ -98,22 +82,21 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
List<String> coll = Arrays.asList(collections.split(","));
List<String> coll = Arrays.asList(configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG).split(","));
int numGroups = Math.min(coll.size(), maxTasks);
List<List<String>> dbsGrouped = ConnectorUtils.groupPartitions(coll, numGroups);
List<String> topics = Arrays.asList(this.topics.split(","));
List<String> topics = Arrays.asList(configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG).split(","));
List<List<String>> topicsGrouped = ConnectorUtils.groupPartitions(topics, numGroups);

for (int i = 0; i < numGroups; i++) {
IntStream.range(0, numGroups).forEach(i -> {
Map<String, String> config = new HashMap<>();
config.put(PORT, port);
config.put(BULK_SIZE, bulkSize);
config.put(HOST, host);
config.put(DATABASE, database);
config.put(COLLECTIONS, StringUtils.join(dbsGrouped.get(i), ","));
config.put(TOPICS, StringUtils.join(topicsGrouped.get(i), ","));
config.put(MongoDBSinkConfiguration.BULK_SIZE_CONFIG, configuration.get(MongoDBSinkConfiguration.BULK_SIZE_CONFIG));
config.put(MongoDBSinkConfiguration.HOST_URLS_CONFIG, configuration.get(MongoDBSinkConfiguration.HOST_URLS_CONFIG));
config.put(MongoDBSinkConfiguration.DATABASE_CONFIG, configuration.get(MongoDBSinkConfiguration.DATABASE_CONFIG));
config.put(MongoDBSinkConfiguration.COLLECTIONS_CONFIG, StringUtils.join(dbsGrouped.get(i), ","));
config.put(MongoDBSinkConfiguration.TOPICS_CONFIG, StringUtils.join(topicsGrouped.get(i), ","));
configs.add(config);
}
});
return configs;
}

Expand All @@ -122,6 +105,12 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
*/
@Override
public void stop() {
// not implemented
}

@Override
public ConfigDef config() {
return MongoDBSinkConfiguration.config;
}

}
Loading

0 comments on commit 4c6d9f1

Please sign in to comment.