Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix to point replication directory at universal location, fixing provided dependency scopes, upgrading kafka & java, splitting out configuration #13

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,6 @@
hs_err_pid*

.idea/*
*.iml
*.iml
target
target/*
23 changes: 17 additions & 6 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,11 @@
<artifactId>connect-mongodb</artifactId>
<version>1.0</version>

<properties>
<kafka.version>0.10.0.0</kafka.version>
<mongodb.version>3.2.1</mongodb.version>
</properties>

<build>
<plugins>
<plugin>
Expand All @@ -16,8 +21,8 @@
<version>3.3</version>
<inherited>true</inherited>
<configuration>
<source>1.7</source>
<target>1.7</target>
<source>1.8</source>
<target>1.8</target>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams! :)

</configuration>
</plugin>
<plugin>
Expand Down Expand Up @@ -46,19 +51,25 @@
<dependency>
<groupId>org.mongodb</groupId>
<artifactId>mongodb-driver</artifactId>
<version>3.2.1</version>
<version>${mongodb.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>0.9.0.0</version>
<scope>compile</scope>
<version>${kafka.version}</version>
<scope>provided</scope>
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making dependencies that are provided by Kafka have the correct scope

</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.6</version>
<scope>compile</scope>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
26 changes: 20 additions & 6 deletions src/main/java/org/apache/kafka/connect/mongodb/DatabaseReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.mongodb.CursorType;
import com.mongodb.MongoClient;
import com.mongodb.ServerAddress;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
Expand All @@ -14,7 +15,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.stream.Collectors;

/**
* Reads mutation from a mongodb database
Expand All @@ -23,8 +27,7 @@
*/
public class DatabaseReader implements Runnable {
Logger log = LoggerFactory.getLogger(DatabaseReader.class);
private String host;
private Integer port;
private String hosts;
private String db;
private String start;

Expand All @@ -33,9 +36,8 @@ public class DatabaseReader implements Runnable {
private MongoCollection<Document> oplog;
private Bson query;

public DatabaseReader(String host, Integer port, String db, String start, ConcurrentLinkedQueue<Document> messages) {
this.host = host;
this.port = port;
public DatabaseReader(String hosts, String db, String start, ConcurrentLinkedQueue<Document> messages) {
this.hosts = hosts;
this.db = db;
this.start = start;
this.messages = messages;
Expand Down Expand Up @@ -81,7 +83,19 @@ private void init() {
* @return the oplog collection
*/
private MongoCollection readCollection() {
MongoClient mongoClient = new MongoClient(host, port);
List<ServerAddress> addresses = Arrays.stream(hosts.split(",")).map(hostUrl -> {
try {
String[] hostAndPort = hostUrl.split(":");
String host = hostAndPort[0];
int port = Integer.parseInt(hostAndPort[1]);
return new ServerAddress(host, port);
} catch (ArrayIndexOutOfBoundsException aioobe) {
throw new ConnectException("hosts must be in host:port format");
} catch (NumberFormatException nfe) {
throw new ConnectException("port in the hosts field must be an integer");
}
}).collect(Collectors.toList());
MongoClient mongoClient = new MongoClient(addresses);
MongoDatabase db = mongoClient.getDatabase("local");
return db.getCollection("oplog.rs");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,19 +15,17 @@
*
* @author Andrea Patelli
*/
public class MongodbReader {
private static final Logger log = LoggerFactory.getLogger(MongodbReader.class);
public class MongoDBReader {
private static final Logger log = LoggerFactory.getLogger(MongoDBReader.class);

protected ConcurrentLinkedQueue<Document> messages;

private List<String> dbs;
private String host;
private Integer port;
private String hosts;
private Map<Map<String, String>, Map<String, Object>> start;

public MongodbReader(String host, Integer port, List<String> dbs, Map<Map<String, String>, Map<String, Object>> start) {
this.host = host;
this.port = port;
public MongoDBReader(String hosts, List<String> dbs, Map<Map<String, String>, Map<String, Object>> start) {
this.hosts = hosts;
this.dbs = new ArrayList<>(0);
this.dbs.addAll(dbs);
this.start = start;
Expand All @@ -36,7 +34,7 @@ public MongodbReader(String host, Integer port, List<String> dbs, Map<Map<String

public void run() {
// for every database to watch
for (String db : dbs) {
dbs.stream().forEach(db -> {
String start;
// get the last message that was read
Map<String, Object> dbOffset = this.start.get(Collections.singletonMap("mongodb", db));
Expand All @@ -46,13 +44,12 @@ public void run() {
start = (String) this.start.get(Collections.singletonMap("mongodb", db)).get(db);

log.trace("Starting database reader with configuration: ");
log.trace("host: {}", host);
log.trace("port: {}", port);
log.trace("hosts: {}", hosts);
log.trace("db: {}", db);
log.trace("start: {}", start);
// start a new thread for reading mutation of the specific database
DatabaseReader reader = new DatabaseReader(host, port, db, start, messages);
DatabaseReader reader = new DatabaseReader(hosts, db, start, messages);
new Thread(reader).start();
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,39 +1,36 @@
package org.apache.kafka.connect.mongodb;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.utils.AppInfoParser;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.mongodb.configuration.MongoDBSinkConfiguration;
import org.apache.kafka.connect.sink.SinkConnector;
import org.apache.kafka.connect.util.ConnectorUtils;
import org.apache.kafka.connect.utils.LogUtils;
import org.apache.kafka.connect.utils.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;

/**
* MongodbSinkConnector implement the Connector interface to send Kafka
* MongoDBSinkConnector implement the Connector interface to send Kafka
* data to Mongodb
*
* @author Andrea Patelli
* @author Niraj Patel
*/
public class MongodbSinkConnector extends SinkConnector {
private final static Logger log = LoggerFactory.getLogger(MongodbSinkConnector.class);
public class MongoDBSinkConnector extends SinkConnector {

public static final String PORT = "port";
public static final String HOST = "host";
public static final String BULK_SIZE = "bulk.size";
public static final String DATABASE = "mongodb.database";
public static final String COLLECTIONS = "mongodb.collections";
public static final String TOPICS = "topics";
private final static Logger log = LoggerFactory.getLogger(MongoDBSinkConnector.class);

private String port;
private String host;
private String bulkSize;
private String database;
private String collections;
private String topics;
private Map<String, String> configuration;

/**
* Get the version of this connector.
Expand All @@ -49,33 +46,20 @@ public String version() {
* Start this Connector. This method will only be called on a clean Connector, i.e. it has
* either just been instantiated and initialized or {@link #stop()} has been invoked.
*
* @param map configuration settings
* @param configuration configuration settings
*/
@Override
public void start(Map<String, String> map) {
log.trace("Parsing configuration");

port = map.get(PORT);
if (port == null || port.isEmpty())
throw new ConnectException("Missing " + PORT + " config");

bulkSize = map.get(BULK_SIZE);
if (bulkSize == null || bulkSize.isEmpty())
throw new ConnectException("Missing " + BULK_SIZE + " config");

host = map.get(HOST);
if (host == null || host.isEmpty())
throw new ConnectException("Missing " + HOST + " config");

database = map.get(DATABASE);
collections = map.get(COLLECTIONS);
topics = map.get(TOPICS);
public void start(Map<String, String> configuration) {
MongoDBSinkConfiguration sinkConfiguration = new MongoDBSinkConfiguration(configuration);
this.configuration = sinkConfiguration.originalsStrings();

String collections = configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG);
String topics = configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG);
if (collections.split(",").length != topics.split(",").length) {
throw new ConnectException("The number of topics should be the same as the number of collections");
}

LogUtils.dumpConfiguration(map, log);
LogUtils.dumpConfiguration(configuration, log);
}

/**
Expand All @@ -85,7 +69,7 @@ public void start(Map<String, String> map) {
*/
@Override
public Class<? extends Task> taskClass() {
return MongodbSinkTask.class;
return MongoDBSinkTask.class;
}

/**
Expand All @@ -98,22 +82,21 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> configs = new ArrayList<>();
List<String> coll = Arrays.asList(collections.split(","));
List<String> coll = Arrays.asList(configuration.get(MongoDBSinkConfiguration.COLLECTIONS_CONFIG).split(","));
int numGroups = Math.min(coll.size(), maxTasks);
List<List<String>> dbsGrouped = ConnectorUtils.groupPartitions(coll, numGroups);
List<String> topics = Arrays.asList(this.topics.split(","));
List<String> topics = Arrays.asList(configuration.get(MongoDBSinkConfiguration.TOPICS_CONFIG).split(","));
List<List<String>> topicsGrouped = ConnectorUtils.groupPartitions(topics, numGroups);

for (int i = 0; i < numGroups; i++) {
IntStream.range(0, numGroups).forEach(i -> {
Map<String, String> config = new HashMap<>();
config.put(PORT, port);
config.put(BULK_SIZE, bulkSize);
config.put(HOST, host);
config.put(DATABASE, database);
config.put(COLLECTIONS, StringUtils.join(dbsGrouped.get(i), ","));
config.put(TOPICS, StringUtils.join(topicsGrouped.get(i), ","));
config.put(MongoDBSinkConfiguration.BULK_SIZE_CONFIG, configuration.get(MongoDBSinkConfiguration.BULK_SIZE_CONFIG));
config.put(MongoDBSinkConfiguration.HOST_URLS_CONFIG, configuration.get(MongoDBSinkConfiguration.HOST_URLS_CONFIG));
config.put(MongoDBSinkConfiguration.DATABASE_CONFIG, configuration.get(MongoDBSinkConfiguration.DATABASE_CONFIG));
config.put(MongoDBSinkConfiguration.COLLECTIONS_CONFIG, StringUtils.join(dbsGrouped.get(i), ","));
config.put(MongoDBSinkConfiguration.TOPICS_CONFIG, StringUtils.join(topicsGrouped.get(i), ","));
configs.add(config);
}
});
return configs;
}

Expand All @@ -122,6 +105,12 @@ public List<Map<String, String>> taskConfigs(int maxTasks) {
*/
@Override
public void stop() {
// not implemented
}

@Override
public ConfigDef config() {
return MongoDBSinkConfiguration.config;
}

}
Loading