See blog post: https://rmoff.net/2020/01/23/notes-on-getting-data-into-influxdb-from-kafka-with-kafka-connect/ for details.
Load test data:
docker exec -i kafkacat kafkacat -b kafka-1:39092 -P -t json_01 <<EOF
{ "schema": { "type": "struct", "fields": [ { "field": "tags" , "type": "map", "keys": { "type": "string", "optional": false }, "values": { "type": "string", "optional": false }, "optional": false}, { "field": "stock", "type": "double", "optional": true } ], "optional": false, "version": 1 }, "payload": { "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 } }
EOF
Check data is there:
docker exec kafkacat kafkacat -b kafka-1:39092 -C -u -t json_01
Create the connector:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink_influx_json_01/config \
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter" : "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "true",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "json_01",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
Make sure that the connector’s running
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink_influx_json_01 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
Check that the data’s made it to InfluxDB:
$ docker exec -it influxdb influx -execute 'show measurements on "my_db"'
name: measurements
name
----
json_01
$ docker exec -it influxdb influx -execute 'show tag keys on "my_db"'
name: json_01
tagKey
------
host
product
$ docker exec -it influxdb influx -execute 'SELECT * FROM json_01 GROUP BY host, product;' -database "my_db"
name: json_01
tags: host=FOO, product=wibble
time stock
---- -----
1579779810974000000 500
1579779820028000000 500
1579779833143000000 500
Load test data:
docker exec -i schema-registry /usr/bin/kafka-avro-console-producer --broker-list kafka-1:39092 --topic avro_01 --property value.schema='{ "type": "record", "name": "myrecord", "fields": [ { "name": "tags", "type": { "type": "map", "values": "string" } }, { "name": "stock", "type": "double" } ] }' <<EOF
{ "tags": { "host": "FOO", "product": "wibble" }, "stock": 500.0 }
EOF
Check the data’s there (I’m using kafkacat just to be contrary; you use use kafka-avro-console-consumer
too):
$ docker exec -i kafkacat kafkacat -b kafka-1:39092 -C -t avro_01 -r http://schema-registry:8081 -s avro
{"tags": {"host": "FOO", "product": "wibble"}, "stock": 500.0}
Create the connector:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink_influx_avro_01/config \
-d '{
"connector.class" : "io.confluent.influxdb.InfluxDBSinkConnector",
"value.converter" : "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter" : "org.apache.kafka.connect.storage.StringConverter",
"topics" : "avro_01",
"influxdb.url" : "http://influxdb:8086",
"influxdb.db" : "my_db",
"measurement.name.format" : "${topic}"
}'
Make sure that the connector’s running
$ curl -s "http://localhost:8083/connectors?expand=info&expand=status" | \
jq '. | to_entries[] | [ .value.info.type, .key, .value.status.connector.state,.value.status.tasks[].state,.value.info.config."connector.class"]|join(":|:")' | \
column -s : -t| sed 's/\"//g'| sort
sink | sink_influx_avro_01 | RUNNING | RUNNING | io.confluent.influxdb.InfluxDBSinkConnector
Check that the data’s made it to InfluxDB
$ docker exec -it influxdb influx -execute 'SELECT * FROM avro_01 GROUP BY host, product;' -database "my_db"
name: avro_01
tags: host=FOO, product=wibble
time stock
---- -----
1579781680622000000 500