Skip to content
This repository has been archived by the owner on Sep 15, 2023. It is now read-only.

Latest commit

 

History

History
185 lines (132 loc) · 5.49 KB

README.adoc

File metadata and controls

185 lines (132 loc) · 5.49 KB

Kafka-CDI - A simple CDI extension for Apache Kafka

Build Status (Travis CI) License Maven Central Javadocs

Making it easy to use Apache Kafka’s Java client API in a CDI managed environment!

Getting started

See the library in action here!

The section below gives an overview on how to use the Kafka CDI library!

Maven config

In a Maven managed project simply the following to your pom.xml:

...
  <dependency>
    <groupId>org.aerogear.kafka</groupId>
    <artifactId>kafka-cdi-extension</artifactId>
    <version>0.0.10</version>
  </dependency>
...

Injecting a Kafka Producer

The @Producer annotation is used to configure and inject an instance of the SimpleKafkaProducer class, which is a simple extension of the original KafkaProducer class:

...
public class MyPublisherService {

  private Logger logger = LoggerFactory.getLogger(MyPublisherService.class);

  @Producer
  SimpleKafkaProducer<Integer, String> producer;

  /**
   * A simple service method, that sends payload over the wire
   */
  public void hello() {
    producer.send("myTopic", "My Message");
  }
}

Annotating a bean method to declare it as a Kafka Consumer

The @Consumer annotation is used to configure and declare an annotated method as a callback for the internal DelegationKafkaConsumer, which internally uses the vanilla KafkaConsumer:

public class MyListenerService {

  private Logger logger = LoggerFactory.getLogger(MyListenerService.class);

  /**
   * Simple listener that receives messages from the Kafka broker
    */
  @Consumer(topics = "myTopic", groupId = "myGroupID")
  public void receiver(final String message) {
    logger.info("That's what I got: " + message);
  }
}

Receiving the key and the value is also possible:

public class MyListenerService {

  private Logger logger = LoggerFactory.getLogger(MyListenerService.class);

  /**
   * Simple listener that receives messages from the Kafka broker
    */
  @Consumer(topics = "myTopic", groupId = "myGroupID")
  public void receiver(final String key, final String value) {
    logger.info("That's what I got: (key: " + key + " , value:" +  value + ")");
  }
}

Initial KStream and KTable support for Stream Processing:

With the @KafkaStream annotation the libary supports the Kafka Streams API:

@KafkaStream(input = "push_messages_metrics", output = "successMessagesPerJob2")
public KTable successMessagesPerJobTransformer(final KStream<String, String> source) {
    final KTable<String, Long> successCountsPerJob = source.filter((key, value) -> value.equals("Success"))
        .groupByKey()
        .count("successMessagesPerJob");
    return successCountsPerJob;
    }

The method accepts a KStream instance from the input topic, inside the method body some (simple) stream processing can be done, and as return value we support either KStream or KTable. The entire setup is handled by the library itself.

Global Configuration of the Kafka cluster

A minimal of configuration is currently needed. For that there is a @KafkaConfig annotation. The first occurrence is used:

@KafkaConfig(bootstrapServers = "#{KAFKA_SERVICE_HOST}:#{KAFKA_SERVICE_PORT}")
public class MyService {
   ...
}

JsonObject Serialization

Apache Kafka uses a binary message format, and comes with a handful of handy Serializers and Deserializers, available through the Serdes class. The Kafka CDI extension adds a Serde for the JsonObject:

JsonObject Serializer

To send serialize a JsonObject, simply specify the type, like:

...
@Producer
SimpleKafkaProducer<Integer, JsonObject> producer;
...

producer.send("myTopic", myJsonObj);

JsonObject Deserializer

For deserialization the argument on the annotation @Consumer method is used to setup the actual Deserializer

@Consumer(topic = "myTopic", groupId = "myGroupID", keyType = Integer.class)
public void receiveJsonObject(JsonObject message) {
  logger.info("That's what I got: " + message);
}

Running Apache Kafka

To setup Apache Kafka there are different ways to get started. This section quickly discusses pure Docker and Openshift.

Running via Docker images

Starting a Zookeeper cluster:

docker run -d --name zookeeper jplock/zookeeper:3.4.6

Next, we need to start Kafka and link the Zookeeper Linux container to it:

docker run -d --name kafka --link zookeeper:zookeeper ches/kafka

Now, that the broker is running, we need to figure out the IP address of it:

docker inspect --format '{{ .NetworkSettings.IPAddress }}' kafka

We use this IP address when inside our @KafkaConfig annotation that our Producers and Consumers can speak to Apache Kafka.

Running on Openshift

For Apache Kafka on Openshift please check this repository: