Skip to content
This repository has been archived by the owner on Mar 22, 2022. It is now read-only.

Commit

Permalink
Added throttling support
Browse files Browse the repository at this point in the history
  • Loading branch information
Luca Barzè committed Feb 15, 2017
1 parent eea0617 commit 7a0a978
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 33 deletions.
7 changes: 7 additions & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,15 @@ version := "0.1.2-SNAPSHOT"

scalaVersion := "2.11.8"

enablePlugins(GatlingPlugin)

val gatlingVersion = "2.2.3"

libraryDependencies ++= Seq(
"io.gatling" % "gatling-core" % "2.2.3" % "provided",
"io.gatling" % "gatling-test-framework" % gatlingVersion % "test",
"io.gatling.highcharts" % "gatling-charts-highcharts" % gatlingVersion % "test",

("org.apache.kafka" % "kafka-clients" % "0.10.1.1")
// Gatling contains slf4j-api
.exclude("org.slf4j", "slf4j-api")
Expand Down
4 changes: 3 additions & 1 deletion project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
logLevel := Level.Warn
logLevel := Level.Info

addSbtPlugin("org.xerial.sbt" % "sbt-sonatype" % "1.0")

addSbtPlugin("com.jsuereth" % "sbt-pgp" % "1.0.0")

addSbtPlugin("io.gatling" % "gatling-sbt" % "2.2.0")
Original file line number Diff line number Diff line change
Expand Up @@ -13,44 +13,32 @@ import io.gatling.core.stats.message.ResponseTimings
import org.apache.kafka.clients.producer._



class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V],
val kafkaAttributes: KafkaAttributes[K,V],
val coreComponents: CoreComponents,
val kafkaProtocol: KafkaProtocol,
val throttled: Boolean,
val next: Action )
extends ExitableAction with NameGen {

val statsEngine = coreComponents.statsEngine

override val name = genName("kafkaRequest")

override def execute(session: Session): Unit = recover(session) {

kafkaAttributes.requestName(session).flatMap { resolvedRequestName =>
val payload = kafkaAttributes.payload

val outcome = kafkaAttributes.key match {
case Some(k) => k(session).flatMap { resolvedKey =>
sendRequest(
resolvedRequestName,
producer,
Some(resolvedKey),
payload,
session )
}
case None =>
sendRequest(
resolvedRequestName,
producer,
None,
payload,
session )
}
kafkaAttributes requestName session flatMap { requestName =>

val outcome =
sendRequest(
requestName,
producer,
kafkaAttributes,
throttled,
session)

outcome.onFailure(
errorMessage =>
statsEngine.reportUnbuildableRequest(session, resolvedRequestName, errorMessage)
statsEngine.reportUnbuildableRequest(session, requestName, errorMessage)
)

outcome
Expand All @@ -61,14 +49,17 @@ class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V],

private def sendRequest( requestName: String,
producer: Producer[K,V],
key: Option[K],
payload: Expression[V],
kafkaAttributes: KafkaAttributes[K,V],
throttled: Boolean,
session: Session ): Validation[Unit] = {

payload(session).map { resolvedPayload =>
val record = key match {
case Some(k) => new ProducerRecord[K,V](kafkaProtocol.topic, k, resolvedPayload)
case None => new ProducerRecord[K,V](kafkaProtocol.topic, resolvedPayload)
kafkaAttributes payload session map { payload =>

val record = kafkaAttributes.key match {
case Some(k) =>
new ProducerRecord[K, V](kafkaProtocol.topic, k(session).get, payload)
case None =>
new ProducerRecord[K, V](kafkaProtocol.topic, payload)
}

val requestStartDate = nowMillis
Expand All @@ -87,13 +78,17 @@ class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V],
if (e == null) None else Some(e.getMessage)
)

next ! session
if (throttled) {
coreComponents.throttler.throttle(session.scenario, () => next ! session)
} else {
next ! session
}

}

})

}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import scala.collection.JavaConverters._
class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V]) extends ActionBuilder {

override def build( ctx: ScenarioContext, next: Action ): Action = {
import ctx.{protocolComponentsRegistry, system, coreComponents}
import ctx.{protocolComponentsRegistry, system, coreComponents, throttled}

val kafkaComponents: KafkaComponents = protocolComponentsRegistry.components(KafkaProtocol.KafkaProtocolKey)

Expand All @@ -26,6 +26,7 @@ class KafkaRequestActionBuilder[K,V](kafkaAttributes: KafkaAttributes[K,V]) exte
kafkaAttributes,
coreComponents,
kafkaComponents.kafkaProtocol,
throttled,
next
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package com.github.mnogu.gatling.kafka.test

import io.gatling.core.Predef._
import org.apache.kafka.clients.producer.ProducerConfig
import scala.concurrent.duration._

import com.github.mnogu.gatling.kafka.Predef._

class ThrottledSimulation extends Simulation {
val kafkaConf = kafka
// Kafka topic name
.topic("test")
// Kafka producer configs
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
// list of Kafka broker hostname and port pairs
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",

// in most cases, StringSerializer or ByteArraySerializer
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer"))

val scn = scenario("Kafka Test")
.forever(
exec(
kafka("request")
// message to send
.send[String]("foo"))
)

setUp(
scn.inject(atOnceUsers(10)))
.throttle(jumpToRps(10), holdFor(30 seconds))
.protocols(kafkaConf)
}

0 comments on commit 7a0a978

Please sign in to comment.