Skip to content
patelliandrea edited this page Jan 28, 2016 · 12 revisions

Running the connector in Distributed Mode

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker process using the same group.id and they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers.

Interaction with a distributed-mode cluster is via the REST API. To create a connector, you start the workers and then make a REST request to create a connector. Unlike many other systems, Kafka Connect workers do not have a special "leader" process that you have to interact with to use the REST API; all nodes can respond to REST requests, including creating, listing, modifying, and destroying connectors.

In distributed mode, the workers need to be able to discover each other and have shared storage for connector configuration and offset data. In addition to the usual worker settings, you'll want to ensure you've configured the following for this cluster:

  • group.id
  • config.storage.topic
  • offset.storage.topic

To start a distributed worker, run the distributed agent setting the following properties:

  • -brokers comma separated list of kafka brokers to use
  • -group group id to use
  • -cluster name to use for the Kafka-Connect cluster
  • -registry url of the schema registry
  • -offsets name of the topic on which save the offsets
  • -configs name of the topic on which save the connector configuration
  • -port port of the rest service for the worker

Creating a new Connector

After the agent is working you can submit the properties to use for the connector, making a POST request on the host where the agent is running.

Example request:

POST /connectors HTTP/1.1
Host: [host where the connector is running]
Content-Type: application/json
Accept: application/json

{
    "name": "elasticsearch-schema-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
        "topics": "topic",
        "elasticsearch.cluster.name":"test-cluster",
        "elasticsearch.index.hosts":"localhost:9300",
        "elasticsearch.index.prefix":"prefix",
        "elasticsearch.document.name":"document",
        "elasticsearch.bulk.size":"100",
        "tasks.max": "1"
    }
}

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "name": "elasticsearch-schema-sink",
    "config": {
        "connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
        "topics": "topic",
        "elasticsearch.cluster.name":"test-cluster",
        "elasticsearch.index.hosts":"localhost:9300",
        "elasticsearch.index.prefix":"prefix",
        "elasticsearch.document.name":"document",
        "elasticsearch.bulk.size":"100",
        "tasks.max": "1"
    },
    "tasks": [
        { "connector": "elasticsearch-schema-sink", "task": 1 }
    ]
}

Configuration properties:

  • connector.class: the Java class to use for the connector
  • topic: comma separated list of topics from which the connector has to consume messages
  • elasticsearch.cluster.name: name of the Elasticsearch cluster on which the connector has to write documents
  • elasticsearch.index.hosts: comma separated list of Elasticsearch hosts
  • elasticsearch.index.prefix: prefix to prepend to the topic name in order to create the name of the index on which write messages
  • elasticsearch.document.name: name to use for the documents
  • elasticsearch.bulk.size: maximum number of documents to write at a time to Elasticsearch
  • tasks.max: maximum number of task that the connector has to create

When running, the connector will create the configured tasks and will write all the messages in the configured topics into different Elasticsearch indexes.

Edit a Connector

To create a new connector using the given configuration, or update the configuration for an existing connector you need to make a PUT request on the host where the agent is running.

Example request:

PUT /connectors/elasticsearch-schema-sink/config HTTP/1.1
Host: [host where the connector is running]
Content-Type: application/json
Accept: application/json

{
    "name": "elasticsearch-schema-sink",
    "connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
    "topics": "topic-test2",
    "elasticsearch.cluster.name":"test-cluster",
    "elasticsearch.index.hosts":"localhost:9300",
    "elasticsearch.index.prefix":"prefix",
    "elasticsearch.document.name":"document",
    "elasticsearch.bulk.size":"100",
    "tasks.max": "1"
}

Example response:

HTTP/1.1 201 Created
Content-Type:application/json

{
    "name": "elasticsearch-schema-sink",
    "config": {
    "connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
    "topics": "topic-test2",
    "elasticsearch.cluster.name":"test-cluster",
    "elasticsearch.index.hosts":"localhost:9300",
    "elasticsearch.index.prefix":"prefix",
    "elasticsearch.document.name":"document",
    "elasticsearch.bulk.size":"100",
    "tasks.max": "1"
    },
    "tasks": [
        { "connector": "elasticsearch-schema-sink", "task": 1 }
    ]
}

Delete a Connector

Example request:

DELETE /connectors/elasticsearch-schema-sink HTTP/1.1
Host: [host where the connector is running]

Example response:

HTTP/1.1 204 No Content

SETUP

Clone this wiki locally