Integrate Kafka effortlessly into your Ktor application with this powerful Kafka Plugin. This plugin provides an easy setup for Kafka clients, allowing you to configure and manage Kafka Admin, Producer, and Consumer instances directly in your Ktor server.
- Streamlined Configuration: Install Kafka client configurations either through application configuration files or directly in code.
- Admin Client: Easy setup and topic creation for your Kafka admin client.
- Producer Client: Initialize Kafka producer instances effortlessly.
- Consumer Client: Configure and manage Kafka consumer instances, including polling logic and record handling.
- Built in Avro4k support: Avro schemas are supported by default. There's no need to define key/value serializers. Schemas can be registered automatically. Avro records to and from conversion methods
The plugin provides a DSL that enables comprehensive Kafka configuration, adhering to the classes and properties defined in org.apache.kafka.common.config, the DSL offers a fluent, programmatic way to set up your Kafka settings right within your Ktor application.
install(Kafka) {
schemaRegistryUrl = "my.schemaRegistryUrl"
topic(named("my-topic")) {
partitions = 1
replicas = 1
configs {
messageTimestampType = CreateTime
}
}
common { // <-- Define common configs
bootstrapServers = listOf("my-kafka")
retries = 1
clientId = "my-client-id"
}
admin { } // <-- Creates an admin client
producer { // <-- Creates a producer
clientId = "my-client-id"
}
consumer { // <-- Creates a consumer
groupId = "my-group-id"
clientId = "my-client-id-override" //<-- Override common configurations
}
consumerConfig {
consumerRecordHandler(named("my-topic")) { record ->
myService.save(record)
}
}
registerSchemas {
using { // <-- optionally provide a client, by default CIO is used
HttpClient()
}
MyRecord::class at named("my-topic") // <-- Will register schema upon startup
}
}
Alternatively, You can easily install the Kafka plugin using an application configuration file:
install(KafkaFromFileConfig.Kafka) {
consumerConfig {
consumerRecordHandler("my-topic") { record ->
myService.save(record)
}
}
registerSchemas {
MyRecord::class at named("my-topic") // <-- Will register schema upon startup
}
}
The above will look for the config in ktor.kafka
by default.
You can also specify a different path if needed:
install(KafkaFromFileConfig.Kafka("ktor.my.kafka")){
...
}
Example file configuration:
ktor {
kafka {
schema.registry.url = ["SCHEMA_REGISTRY_URL"]
common {
"bootstrap.servers" = ["BOOTSTRAP_SERVERS"]
# Additional configuration
}
admin {
##Additional configuration
}
consumer {
"group.id" = "my-group-id"
# Additional configuration
}
producer {
"client.id" = "my-client-id"
# Additional configuration
}
topics = [
{
name = my-topic
partitions = 1
replicas = 1
configs {
"message.timestamp.type" = CreateTime
# Additional configuration
}
}
]
}
}
After installation, you can easily access the initialized Kafka clients throughout your Ktor application:
val adminClient = application.kafkaAdminClient
val producer = application.kafkaProducer
val consumer = application.kafkaConsumer
-
Make sure you define a consumer configuration when you initialize a consumer client, or the consumer job will not start automatically.
-
Ensure that the
pollFrequency
for consumers is set appropriately, depending on your use-case. -
Always verify topic creation and monitor client status for optimal Kafka integration.
-
This plugin works asynchronously, so it's advised to monitor the logs for setup completion and error notifications.
This project uses code from gAmUssA/ktor-kafka, which is available under the MIT License. This project expands on concepts that were introduced there in a few ways:
- The Topic DSL idea was expanded to allow for configuration of all components in a similar manner
- The consumer behaviour can be configured in the plugin setup
- The producer and consumer are created automatically and exposed via the server attributes
- The consumer can return a flow of records