diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala index 1431156..b71c72a 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala @@ -2,14 +2,13 @@ package com.github.mnogu.gatling.kafka.action import com.github.mnogu.gatling.kafka.protocol.KafkaProtocol import com.github.mnogu.gatling.kafka.request.builder.KafkaAttributes -import io.gatling.core.action.{Action, ExitableAction} import io.gatling.commons.stats.{KO, OK} -import io.gatling.core.session._ import io.gatling.commons.util.DefaultClock import io.gatling.commons.validation.Validation import io.gatling.core.CoreComponents +import io.gatling.core.action.{Action, ExitableAction} +import io.gatling.core.session._ import io.gatling.core.util.NameGen -import io.gatling.core.stats.message.ResponseTimings import org.apache.kafka.clients.producer._ @@ -54,11 +53,11 @@ class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V], throttled: Boolean, session: Session ): Validation[Unit] = { - kafkaAttributes payload session map { payload => + kafkaAttributes payload session map { payload => val record = kafkaAttributes.key match { case Some(k) => - new ProducerRecord[K, V](kafkaProtocol.topic, k(session).toOption.get, payload) + new ProducerRecord[K, V](kafkaProtocol.topic,null, k(session).toOption.get, payload, kafkaAttributes.headers ) case None => new ProducerRecord[K, V](kafkaProtocol.topic, payload) } diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala b/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala index d216265..3a6c2d1 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala @@ -1,19 +1,26 @@ package com.github.mnogu.gatling.kafka.request.builder +import java.util + import com.github.mnogu.gatling.kafka.action.KafkaRequestActionBuilder -import io.gatling.core.session._ +import io.gatling.core.session.Expression +import org.apache.kafka.common.header.Header case class KafkaAttributes[K,V]( requestName: Expression[String], key: Option[Expression[K]], - payload: Expression[V] ) + payload: Expression[V], + headers: util.List[Header] + ) case class KafkaRequestBuilder(requestName: Expression[String]) { - def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None) + def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None,null) + + def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key), null) - def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key)) + def send[K,V](key: Expression[K], payload: Expression[V], headers:util.List[Header] ): KafkaRequestActionBuilder[K,V] = send(payload, Some(key), headers) - private def send[K,V](payload: Expression[V], key: Option[Expression[K]]) = - new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload)) + private def send[K,V](payload: Expression[V], key: Option[Expression[K]],headers: util.List[Header] ) = + new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload,headers)) } diff --git a/src/test/scala/com/github/mnogu/gatling/kafka/test/KeyValueHeadersSimulation.scala b/src/test/scala/com/github/mnogu/gatling/kafka/test/KeyValueHeadersSimulation.scala new file mode 100644 index 0000000..bd2ab35 --- /dev/null +++ b/src/test/scala/com/github/mnogu/gatling/kafka/test/KeyValueHeadersSimulation.scala @@ -0,0 +1,46 @@ +package com.github.mnogu.gatling.kafka.test + +import java.util + +import com.github.mnogu.gatling.kafka.Predef._ +import io.gatling.core.Predef._ +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.RecordHeader + +import scala.concurrent.duration._ + +class KeyValueHeadersSimulation extends Simulation { + val kafkaConf = kafka + // Kafka topic name + .topic("test") + // Kafka producer configs + .properties( + Map( + ProducerConfig.ACKS_CONFIG -> "1", + // list of Kafka broker hostname and port pairs + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", + + // in most cases, StringSerializer or ByteArraySerializer + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> + "org.apache.kafka.common.serialization.ByteArraySerializer", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> + "org.apache.kafka.common.serialization.ByteArraySerializer")) + + //Headers + val header: Header = new RecordHeader("key", "valueAsBytes".getBytes()) + val headers: util.List[Header] = new util.ArrayList + headers.add(header) + + // Push kafka with key, value and headers + val scn = scenario("Kafka Test") + .exec( + kafka("request") + // message to send + .send("key".getBytes(),"value".getBytes(),headers)) + + setUp( + scn + .inject(constantUsersPerSec(10) during(90 seconds))) + .protocols(kafkaConf) +}