-
Notifications
You must be signed in to change notification settings - Fork 17
Setup Guide
Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker process using the same group.id
and they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.
Interaction with a distributed-mode cluster is via the REST API. To create a connector, you start the workers and then make a REST request to create a connector. Unlike many other systems, Kafka Connect workers do not have a special "leader" process that you have to interact with to use the REST API; all nodes can respond to REST requests, including creating, listing, modifying, and destroying connectors.
In distributed mode, the workers need to be able to discover each other and have shared storage for connector configuration and offset data. In addition to the usual worker settings, you'll want to ensure you've configured the following for this cluster:
group.id
config.storage.topic
offset.storage.topic
To start a distributed worker, run the distributed agent setting the following properties:
- -brokers comma separated list of kafka brokers to use
- -group group id to use
- -cluster name to use for the Kafka-Connect cluster
- -registry url of the schema registry
- -offsets name of the topic on which save the offsets
- -configs name of the topic on which save the connector configuration
- -port port of the rest service for the worker
After the agent is working you can submit the properties to use for the connector, making a POST request on the host where the agent is running.
Example request:
POST /connectors HTTP/1.1
Host: [host where the connector is running]
Content-Type: application/json
Accept: application/json
{
"name": "elasticsearch-schema-sink",
"config": {
"connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
"topics": "topic",
"elasticsearch.cluster.name":"test-cluster",
"elasticsearch.index.hosts":"localhost:9300",
"elasticsearch.index.prefix":"prefix",
"elasticsearch.document.name":"document",
"elasticsearch.bulk.size":"100"
"tasks.max": "1"
}
}
Example response:
HTTP/1.1 200 OK
Content-Type: application/json
{
"name": "elasticsearch-schema-sink",
"config": {
"connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
"topics": "topic",
"elasticsearch.cluster.name":"test-cluster",
"elasticsearch.index.hosts":"localhost:9300",
"elasticsearch.index.prefix":"prefix",
"elasticsearch.document.name":"document",
"elasticsearch.bulk.size":"100"
"tasks.max": "1"
},
"tasks": [
{ "connector": "elasticsearch-schema-sink", "task": 1 }
]
}
Configuration properties:
- connector.class: the Java class to use for the connector
- topic: the prefix to use for the topic where to publish the documents read from couchbase
- couchbase.nodes: comma separated list of addresses of the couchbase nodes from which read the documents
- couchbase.bucket: couchbase bucket from which load documents
- dcp.maximum.drainrate: maximum number of documents to read from couchbase every second
- couchbase.dcpConnectionBufferSize: size of the dcp connection buffer, default 20971520 (20MB)
When running, the connector will create a task and publish all the mutation messages in a Kafka Topic with the same name of the bucket, prefixed by the value of the topic property submitted at connector creation time.
To create a new connector using the given configuration, or update the configuration for an existing connector you need to make a PUT request on the host where the agent is running.
Example request:
POST /connectors/couchbase-kafka-connector/config HTTP/1.1
Host: [host where the connector is running]
Content-Type: application/json
Accept: application/json
{
"connector.class": "org.apache.kafka.connect.couchbase.CouchbaseSourceConnector",
"topic": "couchbase-kafka-connector",
"schema.name": "couchbaseschema",
"couchbase.nodes": "localhost",
"couchbase.buckets": "PMCONCURRENCY",
"dcp.maximum.drainrate": "1000",
"couchbase.dcpConnectionBufferSize": "0",
"tasks.max": "1"
}
Example response:
HTTP/1.1 201 Created
Content-Type:application/json
{
"name": "couchbase-kafka-connector",
"config": {
"connector.class": "org.apache.kafka.connect.couchbase.CouchbaseSourceConnector",
"topic": "couchbase-kafka-connector",
"schema.name": "couchbaseschema",
"couchbase.nodes": "localhost",
"couchbase.buckets": "PMCONCURRENCY",
"dcp.maximum.drainrate": "1000",
"couchbase.dcpConnectionBufferSize": "0",
"tasks.max": "1"
},
"tasks": [
{ "connector": "couchbase-kafka-connector", "task": 1 }
]
}
Example request:
DELETE /connectors/couchbase-kafka-connector HTTP/1.1
Host: [host where the connector is running]
Example response:
HTTP/1.1 204 No Content