diff --git a/README.md b/README.md index 5077172..ad224d9 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ # Gatling-Kafka -An unofficial [Gatling](http://gatling.io/) 2.1 stress test plugin -for [Apache Kafka](http://kafka.apache.org/) 0.8 protocol. +An unofficial [Gatling](http://gatling.io/) 2.2 stress test plugin +for [Apache Kafka](http://kafka.apache.org/) 0.10 protocol. This plugin supports the Kafka producer API only and doesn't support the Kafka consumer API. @@ -24,7 +24,7 @@ If you want to change the version of Gatling used to create a jar file, change the following line in [`build.sbt`](build.sbt): ```scala -"io.gatling" % "gatling-core" % "2.1.3" % "provided", +"io.gatling" % "gatling-core" % "2.2.3" % "provided", ``` and run `sbt assembly`. @@ -33,34 +33,34 @@ If you don't want to include kafka-clients library to the jar file, change a line on kafka-clients in [`build.sbt`](build.sbt) from ```scala -("org.apache.kafka" % "kafka-clients" % "0.8.2.0") +("org.apache.kafka" % "kafka-clients" % "0.10.1.1") ``` to ```scala -("org.apache.kafka" % "kafka-clients" % "0.8.2.0" % "provided") +("org.apache.kafka" % "kafka-clients" % "0.10.1.1" % "provided") ``` and run `sbt assembly`. -Note that Apache Kafka 0.8.1.1 or below doesn't contain kafka-clients library. +Note that Apache Kafka 0.10.1.1 or below doesn't contain kafka-clients library. ### Putting the jar file to lib directory Put the jar file to `lib` directory in Gatling: - $ cp target/scala-2.11/gatling-kafka-assembly-*.jar /path/to/gatling-charts-highcharts-bundle-2.1.*/lib + $ cp target/scala-2.11/gatling-kafka-assembly-*.jar /path/to/gatling-charts-highcharts-bundle-2.2.*/lib If you edited `build.sbt` in order not to include kafka-clients library to the jar file, you also need to copy kafka-clients library to `lib` directory: - $ cp /path/to/kafka-clients-*.jar /path/to/gatling-charts-highcharts-bundle-2.1.*/lib + $ cp /path/to/kafka-clients-*.jar /path/to/gatling-charts-highcharts-bundle-2.2.*/lib ### Creating a simulation file - $ cd /path/to/gatling-charts-highcharts-bundle-2.1.* + $ cd /path/to/gatling-charts-highcharts-bundle-2.2.* $ vi user-files/simulations/KafkaSimulation.scala You can find sample simulation files in the [test directory](src/test/scala/com/github/mnogu/gatling/kafka/test). diff --git a/build.sbt b/build.sbt index 7f2d82f..a3b3f4b 100644 --- a/build.sbt +++ b/build.sbt @@ -2,15 +2,13 @@ name := "gatling-kafka" organization := "com.github.mnogu" -version := "0.1.1-SNAPSHOT" +version := "0.1.2-SNAPSHOT" -scalaVersion := "2.11.5" +scalaVersion := "2.11.8" libraryDependencies ++= Seq( - "io.gatling" % "gatling-core" % "2.1.7" % "provided", - // Gatling 2.1.3 depends on Scala 2.11 - // and Apache Kafka 0.8.1.1 doesn't support Scala 2.11 - ("org.apache.kafka" % "kafka-clients" % "0.8.2.0") + "io.gatling" % "gatling-core" % "2.2.3" % "provided", + ("org.apache.kafka" % "kafka-clients" % "0.10.1.1") // Gatling contains slf4j-api .exclude("org.slf4j", "slf4j-api") ) diff --git a/project/plugins.sbt b/project/plugins.sbt index e09c845..61d44fa 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -2,4 +2,4 @@ logLevel := Level.Warn addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0") -addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") +addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0") \ No newline at end of file diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/Predef.scala b/src/main/scala/com/github/mnogu/gatling/kafka/Predef.scala index fa6e9d8..faec7db 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/Predef.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/Predef.scala @@ -1,11 +1,15 @@ package com.github.mnogu.gatling.kafka -import com.github.mnogu.gatling.kafka.config.KafkaProtocol +import com.github.mnogu.gatling.kafka.protocol.KafkaProtocolBuilder import com.github.mnogu.gatling.kafka.request.builder.KafkaRequestBuilder +import io.gatling.core.config.GatlingConfiguration import io.gatling.core.session.Expression + object Predef { - def kafka = KafkaProtocol.DefaultKafkaProtocol + + def kafka(implicit configuration: GatlingConfiguration) = KafkaProtocolBuilder(configuration) def kafka(requestName: Expression[String]) = new KafkaRequestBuilder(requestName) + } \ No newline at end of file diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala index 766340b..9e91894 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala @@ -1,104 +1,94 @@ package com.github.mnogu.gatling.kafka.action -import akka.actor.ActorRef - -import com.github.mnogu.gatling.kafka.config.KafkaProtocol +import com.github.mnogu.gatling.kafka.protocol.KafkaProtocol import com.github.mnogu.gatling.kafka.request.builder.KafkaAttributes -import io.gatling.core.action.{Failable, Interruptable} -import io.gatling.core.result.message.{KO, OK} -import io.gatling.core.result.writer.DataWriterClient +import io.gatling.core.action.{Action, ExitableAction} +import io.gatling.commons.stats.{KO, OK} import io.gatling.core.session._ -import io.gatling.core.util.TimeHelper.nowMillis -import io.gatling.core.validation.Validation +import io.gatling.commons.util.ClockSingleton._ +import io.gatling.commons.validation.Validation +import io.gatling.core.CoreComponents +import io.gatling.core.util.NameGen +import io.gatling.core.stats.message.ResponseTimings import org.apache.kafka.clients.producer._ -object KafkaRequestAction extends DataWriterClient { - def reportUnbuildableRequest( - requestName: String, - session: Session, - errorMessage: String): Unit = { - val now = nowMillis - writeRequestData( - session, requestName, now, now, now, now, KO, Some(errorMessage)) - } -} -class KafkaRequestAction[K,V]( - val producer: KafkaProducer[K,V], - val kafkaAttributes: KafkaAttributes[K,V], - val kafkaProtocol: KafkaProtocol, - val next: ActorRef) - extends Interruptable with Failable with DataWriterClient { - - def executeOrFail(session: Session): Validation[Unit] = { - kafkaAttributes.requestName(session).flatMap { resolvedRequestName => - val payload = kafkaAttributes.payload - - val outcome = kafkaAttributes.key match { - case Some(k) => k(session).flatMap { resolvedKey => - sendRequest( - resolvedRequestName, - producer, - Some(resolvedKey), - payload, - session) - } - case None => sendRequest( - resolvedRequestName, +class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V], + val kafkaAttributes: KafkaAttributes[K,V], + val coreComponents: CoreComponents, + val kafkaProtocol: KafkaProtocol, + val throttled: Boolean, + val next: Action ) + extends ExitableAction with NameGen { + + val statsEngine = coreComponents.statsEngine + override val name = genName("kafkaRequest") + + override def execute(session: Session): Unit = recover(session) { + + kafkaAttributes requestName session flatMap { requestName => + + val outcome = + sendRequest( + requestName, producer, - None, - payload, + kafkaAttributes, + throttled, session) - } - + outcome.onFailure( - errorMessage => KafkaRequestAction.reportUnbuildableRequest( - resolvedRequestName, session, errorMessage)) + errorMessage => + statsEngine.reportUnbuildableRequest(session, requestName, errorMessage) + ) + outcome + } + } - private def sendRequest( - requestName: String, - producer: Producer[K,V], - key: Option[K], - payload: Expression[V], - session: Session): Validation[Unit] = { - - payload(session).map { resolvedPayload => - val record = key match { - case Some(k) => new ProducerRecord[K,V](kafkaProtocol.topic, k, resolvedPayload) - case None => new ProducerRecord[K,V](kafkaProtocol.topic, resolvedPayload) + + private def sendRequest( requestName: String, + producer: Producer[K,V], + kafkaAttributes: KafkaAttributes[K,V], + throttled: Boolean, + session: Session ): Validation[Unit] = { + + kafkaAttributes payload session map { payload => + + val record = kafkaAttributes.key match { + case Some(k) => + new ProducerRecord[K, V](kafkaProtocol.topic, k(session).get, payload) + case None => + new ProducerRecord[K, V](kafkaProtocol.topic, payload) } val requestStartDate = nowMillis - val requestEndDate = nowMillis - // send the request producer.send(record, new Callback() { + override def onCompletion(m: RecordMetadata, e: Exception): Unit = { - val responseStartDate = nowMillis - val responseEndDate = nowMillis - // log the outcome - writeRequestData( + val requestEndDate = nowMillis + statsEngine.logResponse( session, requestName, - requestStartDate, - requestEndDate, - responseStartDate, - responseEndDate, + ResponseTimings(startTimestamp = requestStartDate, endTimestamp = requestEndDate), if (e == null) OK else KO, - if (e == null) None else Some(e.getMessage)) + None, + if (e == null) None else Some(e.getMessage) + ) + + if (throttled) { + coreComponents.throttler.throttle(session.scenario, () => next ! session) + } else { + next ! session + } + } }) - // calling the next action in the chain - next ! session } - } - override def postStop(): Unit = { - super.postStop() - producer.close() } + } diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala index e090cef..5c1ae1a 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestActionBuilder.scala @@ -1,27 +1,35 @@ package com.github.mnogu.gatling.kafka.action -import akka.actor.ActorDSL.actor -import akka.actor.ActorRef -import com.github.mnogu.gatling.kafka.config.KafkaProtocol +import com.github.mnogu.gatling.kafka.protocol.{KafkaComponents, KafkaProtocol} import com.github.mnogu.gatling.kafka.request.builder.KafkaAttributes +import io.gatling.core.action.Action import io.gatling.core.action.builder.ActionBuilder -import io.gatling.core.config.Protocols +import io.gatling.core.structure.ScenarioContext import org.apache.kafka.clients.producer.KafkaProducer import scala.collection.JavaConverters._ -class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V]) - extends ActionBuilder { - override def registerDefaultProtocols(protocols: Protocols): Protocols = - protocols + KafkaProtocol.DefaultKafkaProtocol +class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V]) extends ActionBuilder { + + override def build( ctx: ScenarioContext, next: Action ): Action = { + import ctx.{protocolComponentsRegistry, system, coreComponents, throttled} + + val kafkaComponents: KafkaComponents = protocolComponentsRegistry.components(KafkaProtocol.KafkaProtocolKey) + + val producer = new KafkaProducer[K,V]( kafkaComponents.kafkaProtocol.properties.asJava ) + + system.registerOnTermination(producer.close()) + + new KafkaRequestAction( + producer, + kafkaAttributes, + coreComponents, + kafkaComponents.kafkaProtocol, + throttled, + next + ) - def build(next: ActorRef, protocols: Protocols): ActorRef = { - val kafkaProtocol = protocols.getProtocol[KafkaProtocol].getOrElse( - throw new UnsupportedOperationException("Kafka Protocol wasn't registered")) - val producer = new KafkaProducer[K,V]( - kafkaProtocol.properties.asJava) - actor(actorName("kafkaRequest"))(new KafkaRequestAction( - producer, kafkaAttributes, kafkaProtocol, next)) } + } \ No newline at end of file diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/config/KafkaProtocol.scala b/src/main/scala/com/github/mnogu/gatling/kafka/config/KafkaProtocol.scala deleted file mode 100644 index 5b97d67..0000000 --- a/src/main/scala/com/github/mnogu/gatling/kafka/config/KafkaProtocol.scala +++ /dev/null @@ -1,17 +0,0 @@ -package com.github.mnogu.gatling.kafka.config - -import io.gatling.core.config.Protocol - -object KafkaProtocol { - val DefaultKafkaProtocol = new KafkaProtocol( - topic = "", - properties = Map()) -} - -case class KafkaProtocol( - topic: String, - properties: Map[String, Object]) extends Protocol { - - def topic(topic: String): KafkaProtocol = copy(topic = topic) - def properties(properties: Map[String, Object]): KafkaProtocol = copy(properties = properties) -} diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaComponents.scala b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaComponents.scala new file mode 100644 index 0000000..bebb7b6 --- /dev/null +++ b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaComponents.scala @@ -0,0 +1,13 @@ +package com.github.mnogu.gatling.kafka.protocol + +import io.gatling.core.protocol.ProtocolComponents +import io.gatling.core.session.Session + + +case class KafkaComponents(kafkaProtocol: KafkaProtocol) extends ProtocolComponents { + + override def onStart: Option[(Session) => Session] = None + + override def onExit: Option[(Session) => Unit] = None + +} diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala new file mode 100644 index 0000000..2c3fe11 --- /dev/null +++ b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocol.scala @@ -0,0 +1,43 @@ +package com.github.mnogu.gatling.kafka.protocol + +import akka.actor.ActorSystem +import io.gatling.core.CoreComponents +import io.gatling.core.config.GatlingConfiguration +import io.gatling.core.protocol.{Protocol, ProtocolKey} + +object KafkaProtocol { + + def apply(configuration: GatlingConfiguration): KafkaProtocol = KafkaProtocol ( + topic = "", + properties = Map() + ) + + val KafkaProtocolKey = new ProtocolKey { + + type Protocol = KafkaProtocol + type Components = KafkaComponents + + def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[KafkaProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]] + + def defaultProtocolValue(configuration: GatlingConfiguration): KafkaProtocol = KafkaProtocol(configuration) + + def newComponents(system: ActorSystem, coreComponents: CoreComponents): KafkaProtocol => KafkaComponents = { + + kafkaProtocol => { + val kafkaComponents = KafkaComponents ( + kafkaProtocol + ) + + kafkaComponents + } + } + } +} + +case class KafkaProtocol( + topic: String, + properties: Map[String, Object]) extends Protocol { + + def topic(topic: String): KafkaProtocol = copy(topic = topic) + def properties(properties: Map[String, Object]): KafkaProtocol = copy(properties = properties) +} diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocolBuilder.scala b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocolBuilder.scala new file mode 100644 index 0000000..8b08a98 --- /dev/null +++ b/src/main/scala/com/github/mnogu/gatling/kafka/protocol/KafkaProtocolBuilder.scala @@ -0,0 +1,20 @@ +package com.github.mnogu.gatling.kafka.protocol + +import io.gatling.core.config.GatlingConfiguration + + +object KafkaProtocolBuilder { + + implicit def toKafkaProtocol(builder: KafkaProtocolBuilder): KafkaProtocol = builder.build + + def apply(configuration: GatlingConfiguration) : KafkaProtocolBuilder = + KafkaProtocolBuilder(KafkaProtocol(configuration)) + +} + + +case class KafkaProtocolBuilder(kafkaProtocol: KafkaProtocol) { + + def build = kafkaProtocol + +} \ No newline at end of file diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala b/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala index 3b9bad3..d216265 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala @@ -3,17 +3,17 @@ package com.github.mnogu.gatling.kafka.request.builder import com.github.mnogu.gatling.kafka.action.KafkaRequestActionBuilder import io.gatling.core.session._ -case class KafkaAttributes[K,V]( - requestName: Expression[String], - key: Option[Expression[K]], - payload: Expression[V]) +case class KafkaAttributes[K,V]( requestName: Expression[String], + key: Option[Expression[K]], + payload: Expression[V] ) case class KafkaRequestBuilder(requestName: Expression[String]) { - def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = - send(payload, None) - def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = - send(payload, Some(key)) + + def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None) + + def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key)) private def send[K,V](payload: Expression[V], key: Option[Expression[K]]) = new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload)) + } diff --git a/src/test/scala/com/github/mnogu/gatling/kafka/test/ThrottledSimulation.scala b/src/test/scala/com/github/mnogu/gatling/kafka/test/ThrottledSimulation.scala new file mode 100644 index 0000000..9f233a1 --- /dev/null +++ b/src/test/scala/com/github/mnogu/gatling/kafka/test/ThrottledSimulation.scala @@ -0,0 +1,38 @@ +package com.github.mnogu.gatling.kafka.test + +import io.gatling.core.Predef._ +import org.apache.kafka.clients.producer.ProducerConfig +import scala.concurrent.duration._ + +import com.github.mnogu.gatling.kafka.Predef._ + +class ThrottledSimulation extends Simulation { + val kafkaConf = kafka + // Kafka topic name + .topic("test") + // Kafka producer configs + .properties( + Map( + ProducerConfig.ACKS_CONFIG -> "1", + // list of Kafka broker hostname and port pairs + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092", + + // in most cases, StringSerializer or ByteArraySerializer + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> + "org.apache.kafka.common.serialization.StringSerializer", + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> + "org.apache.kafka.common.serialization.StringSerializer")) + + val scn = scenario("Kafka Test") + .forever( + exec( + kafka("request") + // message to send + .send[String]("foo")) + ) + + setUp( + scn.inject(atOnceUsers(10))) + .throttle(jumpToRps(10), holdFor(30 seconds)) + .protocols(kafkaConf) +}