Skip to content
This repository has been archived by the owner on Mar 22, 2022. It is now read-only.

Commit

Permalink
Support for gatling 2.2 and kafka 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Barzè committed Feb 10, 2017
1 parent ae3520e commit 321e3e5
Show file tree
Hide file tree
Showing 9 changed files with 169 additions and 106 deletions.
10 changes: 4 additions & 6 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ name := "gatling-kafka"

organization := "com.github.mnogu"

version := "0.1.1-SNAPSHOT"
version := "0.1.2-SNAPSHOT"

scalaVersion := "2.11.5"
scalaVersion := "2.11.8"

libraryDependencies ++= Seq(
"io.gatling" % "gatling-core" % "2.1.7" % "provided",
// Gatling 2.1.3 depends on Scala 2.11
// and Apache Kafka 0.8.1.1 doesn't support Scala 2.11
("org.apache.kafka" % "kafka-clients" % "0.8.2.0")
"io.gatling" % "gatling-core" % "2.2.3" % "provided",
("org.apache.kafka" % "kafka-clients" % "0.10.1.1")
// Gatling contains slf4j-api
.exclude("org.slf4j", "slf4j-api")
)
Expand Down
8 changes: 6 additions & 2 deletions src/main/scala/com/github/mnogu/gatling/kafka/Predef.scala
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package com.github.mnogu.gatling.kafka

import com.github.mnogu.gatling.kafka.config.KafkaProtocol
import com.github.mnogu.gatling.kafka.protocol.KafkaProtocolBuilder
import com.github.mnogu.gatling.kafka.request.builder.KafkaRequestBuilder
import io.gatling.core.config.GatlingConfiguration
import io.gatling.core.session.Expression


object Predef {
def kafka = KafkaProtocol.DefaultKafkaProtocol

def kafka(implicit configuration: GatlingConfiguration) = KafkaProtocolBuilder(configuration)

def kafka(requestName: Expression[String]) = new KafkaRequestBuilder(requestName)

}
Original file line number Diff line number Diff line change
@@ -1,36 +1,32 @@
package com.github.mnogu.gatling.kafka.action

import akka.actor.ActorRef

import com.github.mnogu.gatling.kafka.config.KafkaProtocol
import com.github.mnogu.gatling.kafka.protocol.KafkaProtocol
import com.github.mnogu.gatling.kafka.request.builder.KafkaAttributes
import io.gatling.core.action.{Failable, Interruptable}
import io.gatling.core.result.message.{KO, OK}
import io.gatling.core.result.writer.DataWriterClient
import io.gatling.core.action.{Action, ExitableAction}
import io.gatling.commons.stats.{KO, OK}
import io.gatling.core.session._
import io.gatling.core.util.TimeHelper.nowMillis
import io.gatling.core.validation.Validation
import io.gatling.commons.util.ClockSingleton._
import io.gatling.commons.validation.Validation
import io.gatling.core.CoreComponents
import io.gatling.core.util.NameGen
import io.gatling.core.stats.message.ResponseTimings
import org.apache.kafka.clients.producer._

object KafkaRequestAction extends DataWriterClient {
def reportUnbuildableRequest(
requestName: String,
session: Session,
errorMessage: String): Unit = {
val now = nowMillis
writeRequestData(
session, requestName, now, now, now, now, KO, Some(errorMessage))
}
}

class KafkaRequestAction[K,V](
val producer: KafkaProducer[K,V],
val kafkaAttributes: KafkaAttributes[K,V],
val kafkaProtocol: KafkaProtocol,
val next: ActorRef)
extends Interruptable with Failable with DataWriterClient {

def executeOrFail(session: Session): Validation[Unit] = {
class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V],
val kafkaAttributes: KafkaAttributes[K,V],
val coreComponents: CoreComponents,
val kafkaProtocol: KafkaProtocol,
val next: Action )
extends ExitableAction with NameGen {

val statsEngine = coreComponents.statsEngine

override val name = genName("kafkaRequest")

override def execute(session: Session): Unit = recover(session) {

kafkaAttributes.requestName(session).flatMap { resolvedRequestName =>
val payload = kafkaAttributes.payload

Expand All @@ -41,28 +37,33 @@ class KafkaRequestAction[K,V](
producer,
Some(resolvedKey),
payload,
session)
session )
}
case None => sendRequest(
resolvedRequestName,
producer,
None,
payload,
session)
case None =>
sendRequest(
resolvedRequestName,
producer,
None,
payload,
session )
}

outcome.onFailure(
errorMessage => KafkaRequestAction.reportUnbuildableRequest(
resolvedRequestName, session, errorMessage))
errorMessage =>
statsEngine.reportUnbuildableRequest(session, resolvedRequestName, errorMessage)
)

outcome

}

}
private def sendRequest(
requestName: String,
producer: Producer[K,V],
key: Option[K],
payload: Expression[V],
session: Session): Validation[Unit] = {

private def sendRequest( requestName: String,
producer: Producer[K,V],
key: Option[K],
payload: Expression[V],
session: Session ): Validation[Unit] = {

payload(session).map { resolvedPayload =>
val record = key match {
Expand All @@ -71,34 +72,28 @@ class KafkaRequestAction[K,V](
}

val requestStartDate = nowMillis
val requestEndDate = nowMillis

// send the request
producer.send(record, new Callback() {

override def onCompletion(m: RecordMetadata, e: Exception): Unit = {
val responseStartDate = nowMillis
val responseEndDate = nowMillis

// log the outcome
writeRequestData(
val requestEndDate = nowMillis
statsEngine.logResponse(
session,
requestName,
requestStartDate,
requestEndDate,
responseStartDate,
responseEndDate,
ResponseTimings(startTimestamp = requestStartDate, endTimestamp = requestEndDate),
if (e == null) OK else KO,
if (e == null) None else Some(e.getMessage))
None,
if (e == null) None else Some(e.getMessage)
)

next ! session

}

})

// calling the next action in the chain
next ! session
}
}

override def postStop(): Unit = {
super.postStop()
producer.close()
}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
package com.github.mnogu.gatling.kafka.action

import akka.actor.ActorDSL.actor
import akka.actor.ActorRef
import com.github.mnogu.gatling.kafka.config.KafkaProtocol
import com.github.mnogu.gatling.kafka.protocol.{KafkaComponents, KafkaProtocol}
import com.github.mnogu.gatling.kafka.request.builder.KafkaAttributes
import io.gatling.core.action.Action
import io.gatling.core.action.builder.ActionBuilder
import io.gatling.core.config.Protocols
import io.gatling.core.structure.ScenarioContext
import org.apache.kafka.clients.producer.KafkaProducer

import scala.collection.JavaConverters._

class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V])
extends ActionBuilder {

override def registerDefaultProtocols(protocols: Protocols): Protocols =
protocols + KafkaProtocol.DefaultKafkaProtocol
class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V]) extends ActionBuilder {

override def build( ctx: ScenarioContext, next: Action ): Action = {
import ctx.{protocolComponentsRegistry, system, coreComponents}

val kafkaComponents: KafkaComponents = protocolComponentsRegistry.components(KafkaProtocol.KafkaProtocolKey)

val producer = new KafkaProducer[K,V]( kafkaComponents.kafkaProtocol.properties.asJava )

system.registerOnTermination(producer.close())

new KafkaRequestAction(
producer,
kafkaAttributes,
coreComponents,
kafkaComponents.kafkaProtocol,
next
)

def build(next: ActorRef, protocols: Protocols): ActorRef = {
val kafkaProtocol = protocols.getProtocol[KafkaProtocol].getOrElse(
throw new UnsupportedOperationException("Kafka Protocol wasn't registered"))
val producer = new KafkaProducer[K,V](
kafkaProtocol.properties.asJava)
actor(actorName("kafkaRequest"))(new KafkaRequestAction(
producer, kafkaAttributes, kafkaProtocol, next))
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.mnogu.gatling.kafka.protocol

import io.gatling.core.protocol.ProtocolComponents
import io.gatling.core.session.Session


case class KafkaComponents(kafkaProtocol: KafkaProtocol) extends ProtocolComponents {

override def onStart: Option[(Session) => Session] = None

override def onExit: Option[(Session) => Unit] = None

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.github.mnogu.gatling.kafka.protocol

import akka.actor.ActorSystem
import io.gatling.core.CoreComponents
import io.gatling.core.config.GatlingConfiguration
import io.gatling.core.protocol.{Protocol, ProtocolKey}

object KafkaProtocol {

def apply(configuration: GatlingConfiguration): KafkaProtocol = KafkaProtocol (
topic = "",
properties = Map()
)

val KafkaProtocolKey = new ProtocolKey {

type Protocol = KafkaProtocol
type Components = KafkaComponents

def protocolClass: Class[io.gatling.core.protocol.Protocol] = classOf[KafkaProtocol].asInstanceOf[Class[io.gatling.core.protocol.Protocol]]

def defaultProtocolValue(configuration: GatlingConfiguration): KafkaProtocol = KafkaProtocol(configuration)

def newComponents(system: ActorSystem, coreComponents: CoreComponents): KafkaProtocol => KafkaComponents = {

kafkaProtocol => {
val kafkaComponents = KafkaComponents (
kafkaProtocol
)

kafkaComponents
}
}
}
}

case class KafkaProtocol(
topic: String,
properties: Map[String, Object]) extends Protocol {

def topic(topic: String): KafkaProtocol = copy(topic = topic)
def properties(properties: Map[String, Object]): KafkaProtocol = copy(properties = properties)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.github.mnogu.gatling.kafka.protocol

import io.gatling.core.config.GatlingConfiguration


object KafkaProtocolBuilder {

implicit def toKafkaProtocol(builder: KafkaProtocolBuilder): KafkaProtocol = builder.build

def apply(configuration: GatlingConfiguration) : KafkaProtocolBuilder =
KafkaProtocolBuilder(KafkaProtocol(configuration))

}


case class KafkaProtocolBuilder(kafkaProtocol: KafkaProtocol) {

def build = kafkaProtocol

}
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@ package com.github.mnogu.gatling.kafka.request.builder
import com.github.mnogu.gatling.kafka.action.KafkaRequestActionBuilder
import io.gatling.core.session._

case class KafkaAttributes[K,V](
requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V])
case class KafkaAttributes[K,V]( requestName: Expression[String],
key: Option[Expression[K]],
payload: Expression[V] )

case class KafkaRequestBuilder(requestName: Expression[String]) {
def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] =
send(payload, None)
def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] =
send(payload, Some(key))

def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None)

def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key))

private def send[K,V](payload: Expression[V], key: Option[Expression[K]]) =
new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload))

}

0 comments on commit 321e3e5

Please sign in to comment.