-
Notifications
You must be signed in to change notification settings - Fork 17
Testing
To test if the Elasticsearch sink connector works as it supposed to work, we copied different Kafka topic to Elasticsearch and checked that the number of documents written on every Elasticsearch index was the same as the number of messages in the Kafka topic from which the connector read data.
We started from a situation in which the log generator was running from a couple of days, so the topics from which the Elasticsearch connector was reading were already full of messages and the schemas on the schema registry were already created.
We only needed to make a configuration in order to start the Elasticsearch connector. This configuration was submitted to a running distributed worker, as explained in the Setup part of the wiki.
{
"name": "elasticsearch-schema-sink",
"config": {
"connector.class": "org.apache.kafka.connect.elasticsearchschema.ElasticsearchSinkConnector",
"topics": "topic-test1,topic-test2",
"elasticsearch.cluster.name":"test-cluster",
"elasticsearch.index.hosts":"localhost:9300",
"elasticsearch.index.prefix":"log",
"elasticsearch.document.name":"log",
"elasticsearch.bulk.size":"100",
"tasks.max": "1"
}
}
We saw that the connector process started and we checked that it was actually writing messages on Elasticsearch using the Elasticsearch REST API:
GET /connectors HTTP/1.1
Host: http://localhost:9200/log_topic-test1/log/_count
Accept: application/json
Response indicating that documents are being written on Elasticsearch:
HTTP/1.1 200 OK
Content-Type: application/json
{
"count" : 7675,
"_shards" : {
"total" : 5,
"successful" : 5,
"failed" : 0
}
}
Once the connector had finished writing data to Elasticsearch, we checked that the number of documents in every index was the same as the number of lines in the files used to populate the topics.
Now, keeping all the services running, we started a log generator and the file connector. In this way, the file connector started again to write messages on KAfka and at the same time the Elasticsearch connector continued to save documents on Elasticsearch.
We generated records, stopped and started again both the file connector and the Elasticsearch connector in order to see if the number of entries on the Elasticsearch indexes continued to match the number of lines of the source files.