Skip to content

Commit

Permalink
feat: kafka tests (#128)
Browse files Browse the repository at this point in the history
* fix: simulation examples now work

* fix: message with null may not be tracked

* feat: kafka tests

* feat: CI config

* feat: CI config

* feat: CI config

* feat: CI config

* feat: CI config

* fix: topic test.t everywhere

* feat: readme test badge

* feat: readme badge

* fix: fix dependencies and add assertion

* fix: scala fmt

* fix: separate provided and test dependencies

---------

Co-authored-by: a.ugodnikov <[email protected]>
  • Loading branch information
daylikon and a.ugodnikov authored Feb 15, 2023
1 parent 3846a49 commit 519e158
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 46 deletions.
39 changes: 39 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,39 @@ jobs:
test:
name: Test Release
runs-on: ubuntu-20.04
services:
zookeeper:
image: wurstmeister/zookeeper
env:
ZOO_MY_ID: "1"
ZOO_PORT: "2181"
ZOO_SERVERS: server.1=zoo1:2888:3888
ports:
- '2181:2181'
kafka:
image: wurstmeister/kafka:2.13-2.6.3
env:
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_HOST_NAME: kafka
KAFKA_LISTENERS: BROKER://:9092,EXTERNAL://:9093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka:9092,EXTERNAL://localhost:9093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_BROKER_ID: "1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: "1"
KAFKA_CREATE_TOPICS: "myTopic:1:1, test.t:1:1"
ports:
- '9092:9092'
- '9093:9093'
schema-registry:
image: confluentinc/cp-schema-registry:7.2.1
env:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092,localhost:9093'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:9094
ports:
- '9094:9094'

steps:
- name: Checkout
uses: actions/[email protected]
Expand All @@ -24,6 +57,12 @@ jobs:

- name: Test Release
run: sbt clean scalafmtCheckAll scalafmtSbtCheck compile test

- name: Tests
run: sbt coverage "Gatling / testOnly ru.tinkoff.gatling.kafka.examples.KafkaGatlingTest" coverageReport

- name: Upload coverage reports to Codecov
uses: codecov/codecov-action@v3

publish:
name: Publish Release
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Gatling Kafka Plugin

![Build](https://github.com/TinkoffCreditSystems/gatling-kafka-plugin/workflows/Build/badge.svg) [![Maven Central](https://img.shields.io/maven-central/v/ru.tinkoff/gatling-kafka-plugin_2.13.svg?color=success)](https://search.maven.org/search?q=ru.tinkoff.gatling-kafka) [![Scala Steward badge](https://img.shields.io/badge/Scala_Steward-helping-blue.svg?style=flat&logo=)](https://scala-steward.org)
[![codecov.io](https://codecov.io/github/Tinkoff/gatling-kafka-plugin/coverage.svg?branch=master)](https://codecov.io/github/Tinkoff/gatling-kafka-plugin?branch=master)

# Introduction

Expand Down
7 changes: 5 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@ import Dependencies._
val scalaV = "2.13.10"
val avroSchemas = Seq() // for example Seq(RegistrySubject("test-hello-schema", 1))

coverageEnabled := true

lazy val root = (project in file("."))
.enablePlugins(GitVersioning)
.enablePlugins(GitVersioning, GatlingPlugin)
.settings(
name := "gatling-kafka-plugin",
scalaVersion := scalaV,
libraryDependencies ++= gatling,
libraryDependencies ++= gatlingTest,
libraryDependencies ++= kafka,
libraryDependencies ++= Seq(avro4s, avroCore, avroSerdes),
libraryDependencies ++= Seq(avro4s, avroCore, avroSerdes, avroSerializers),
schemaRegistrySubjects ++= avroSchemas,
// schemaRegistryUrl := "http://test-schema-registry:8081",
resolvers ++= Seq(
Expand Down
10 changes: 8 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,13 @@ object Dependencies {
}

lazy val gatling: Seq[ModuleID] = Seq(
"io.gatling" % "gatling-core" % Versions.gatling % "provided",
"io.gatling" % "gatling-test-framework" % Versions.gatling % "provided",
"io.gatling" % "gatling-core" % Versions.gatling % "provided",
"io.gatling" % "gatling-core-java" % Versions.gatling % "provided",
)

lazy val gatlingTest: Seq[ModuleID] = Seq(
"io.gatling.highcharts" % "gatling-charts-highcharts" % Versions.gatling % "it,test",
"io.gatling" % "gatling-test-framework" % Versions.gatling % "it,test",
)

lazy val kafka: Seq[ModuleID] = Seq(
Expand All @@ -27,5 +32,6 @@ object Dependencies {
lazy val avroCore: ModuleID = "org.apache.avro" % "avro" % Versions.avro
lazy val avroSerdes: ModuleID =
("io.confluent" % "kafka-streams-avro-serde" % "7.3.0").exclude("org.apache.kafka", "kafka-streams-scala")
lazy val avroSerializers: ModuleID = "io.confluent" % "kafka-avro-serializer" % "7.3.0"

}
1 change: 1 addition & 0 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -9,5 +9,6 @@ addSbtPlugin("com.github.sbt" % "sbt-ci-release" % "1.5.11")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.5.0")
addSbtPlugin("com.github.sbt" % "sbt-avro" % "3.4.2")
addSbtPlugin("ru.tinkoff" % "sbt-schema-registry-plugin" % "0.2.1")
addSbtPlugin("org.scoverage" % "sbt-scoverage" % "2.0.6")

libraryDependencies += "org.apache.avro" % "avro-compiler" % "1.11.1"
19 changes: 14 additions & 5 deletions src/main/scala/ru/tinkoff/gatling/kafka/client/TrackersPool.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,22 @@ class TrackersPool(
if (messageMatcher.responseMatch(message) == null) {
logger.error(s"no messageMatcher key for read message")
} else {
logger.info(s" --- received ${new String(k)} ${new String(v)}")
if (k == null || v == null)
logger.info(s" --- received message with null key or value")
else
logger.info(s" --- received ${new String(k)} ${new String(v)}")
val receivedTimestamp = clock.nowMillis
val replyId = messageMatcher.responseMatch(message)
logMessage(
s"Record received key=${new String(k)}",
message,
)
if (k != null)
logMessage(
s"Record received key=${new String(k)}",
message,
)
else
logMessage(
s"Record received key=null",
message,
)

actor ! MessageConsumed(
replyId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,30 +4,13 @@ import com.sksamuel.avro4s._
import io.gatling.core.Predef._
import io.gatling.core.structure.ScenarioBuilder
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeaders
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol

import scala.concurrent.duration._

class Avro4sSimulation extends Simulation {

val kafkaConf: KafkaProtocol = kafka
// Kafka topic name
.topic("test")
// Kafka producer configs
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
// list of Kafka broker hostname and port pairs
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
// in most cases, StringSerializer or ByteArraySerializer
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG ->
"org.apache.kafka.common.serialization.StringSerializer",
),
)

val kafkaAclConf: KafkaProtocol = kafka
.topic("my.acl.topic")
.properties(
Expand All @@ -37,35 +20,29 @@ class Avro4sSimulation extends Simulation {
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.subject.name.strategy" -> "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"schema.registry.url" -> "http://schema.registry.com",
"security.protocol" -> "SASL_PLAINTEXT",
"sasl.mechanism" -> "SCRAM-SHA-512",
"sasl.jaas.config" -> s"""org.apache.kafka.common.security.scram.ScramLoginModule required username="MY-USER" password="SECRET-PASSWORD";""",
"schema.registry.url" -> "http://localhost:9094",
),
)

case class Ingredient(name: String, sugar: Double, fat: Double)

implicit lazy val ingridientToRecord: ToRecord[Ingredient] = ToRecord.apply
implicit lazy val ingridientFromRecord: FromRecord[Ingredient] = FromRecord.apply
implicit lazy val ingridientScemaFor: SchemaFor[Ingredient] = SchemaFor.apply
implicit lazy val ingridientFormat: RecordFormat[Ingredient] = RecordFormat.apply
implicit val ingridientToRecord: ToRecord[Ingredient] = ToRecord.apply
implicit val ingridientFromRecord: FromRecord[Ingredient] = FromRecord.apply
implicit val ingridientSchemaFor: SchemaFor[Ingredient] = SchemaFor.apply
implicit val ingridientFormat: RecordFormat[Ingredient] = RecordFormat.apply
implicit val ingredientHeaders: Headers = new RecordHeaders()

val scn: ScenarioBuilder = scenario("Kafka Test")
.exec(
kafka("Simple Request")
kafka("Simple Avro4s Request")
// message to send
.send[Ingredient](Ingredient("Cheese", 0d, 70d)),
)
.exec(
kafka("Simple Request with Key")
kafka("Simple Avro4s Request with Key")
// message to send
.send[String, Ingredient]("Key", Ingredient("Cheese", 0d, 70d)),
)

setUp(
scn
.inject(constantUsersPerSec(10) during (90 seconds)),
)
.protocols(kafkaConf)
setUp(scn.inject(atOnceUsers(1))).protocols(kafkaAclConf)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package ru.tinkoff.gatling.kafka.examples

import com.sksamuel.avro4s._
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import io.confluent.kafka.serializers.{KafkaAvroDeserializer, KafkaAvroSerializer}
import io.gatling.core.Predef._
import io.gatling.core.structure.ScenarioBuilder
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol
import org.apache.kafka.common.header.Headers
import org.apache.kafka.common.header.internals.RecordHeaders
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage

import scala.concurrent.duration.DurationInt

class KafkaGatlingTest extends Simulation {

case class Ingredient(name: String, sugar: Double, fat: Double)

implicit val ingridientToRecord: ToRecord[Ingredient] = ToRecord.apply
implicit val ingridientFromRecord: FromRecord[Ingredient] = FromRecord.apply
implicit val ingridientSchemaFor: SchemaFor[Ingredient] = SchemaFor.apply
implicit val ingridientFormat: RecordFormat[Ingredient] = RecordFormat.apply
implicit val ingredientHeaders: Headers = new RecordHeaders()

val kafkaConf: KafkaProtocol = kafka
.topic("test.t")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
),
)

val kafkaConfBytes: KafkaProtocol = kafka
.topic("test.t")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
),
)

val kafkaProtocolRRString: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
),
)
.consumeSettings(
Map(
"bootstrap.servers" -> "localhost:9093",
),
)
.withDefaultTimeout

val kafkaProtocolRRBytes: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArraySerializer",
),
)
.consumeSettings(
Map(
"bootstrap.servers" -> "localhost:9093",
),
)
.timeout(5.seconds)
.matchByValue

val kafkaAvro4sConf: KafkaProtocol = kafka
.topic("test.t")
.properties(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.subject.name.strategy" -> "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"schema.registry.url" -> "http://localhost:9094",
),
)

def matchByOwnVal(message: KafkaProtocolMessage): Array[Byte] = {
message.key
}

val kafkaProtocolRRAvro: KafkaProtocol = kafka.requestReply
.producerSettings(
Map(
ProducerConfig.ACKS_CONFIG -> "1",
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9093",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringSerializer",
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
"value.subject.name.strategy" -> "io.confluent.kafka.serializers.subject.RecordNameStrategy",
"schema.registry.url" -> "http://localhost:9094",
),
)
.consumeSettings(
Map(
"bootstrap.servers" -> "localhost:9093",
),
)
.timeout(5.seconds)
.matchByMessage(matchByOwnVal)

val scnRR: ScenarioBuilder = scenario("RequestReply String")
.exec(
kafka("Request Reply String").requestReply
.requestTopic("myTopic")
.replyTopic("test.t")
.send[String, String]("testCheckJson", """{ "m": "dkf" }""")
.check(jsonPath("$.m").is("dkf")),
)

val scn: ScenarioBuilder = scenario("Request String")
.exec(
kafka("Request String")
.send[String]("foo"),
)
.exec(kafka("Request String 2").send[String, String]("testCheckJson", """{ "m": "dkf" }"""))

val scn2: ScenarioBuilder = scenario("Request Byte")
.exec(
kafka("Request Byte")
.send[Array[Byte], Array[Byte]]("key".getBytes(), "tstBytes".getBytes()),
)

val scnRR2: ScenarioBuilder = scenario("RequestReply Bytes")
.exec(
kafka("Request Reply Bytes").requestReply
.requestTopic("myTopic")
.replyTopic("test.t")
.send[Array[Byte], Array[Byte]]("test".getBytes(), "tstBytes".getBytes()),
)

val scnAvro4s: ScenarioBuilder = scenario("Request Avro4s")
.exec(
kafka("Request Simple Avro4s")
.send(Ingredient("Cheese", 1d, 50d)),
)
.exec(
kafka("Request Avro4s")
.send[String, Ingredient]("key4s", Ingredient("Cheese", 0d, 70d)),
)

val scnRRwo: ScenarioBuilder = scenario("RequestReply w/o answer")
.exec(
kafka("Request Reply Bytes").requestReply
.requestTopic("myTopic")
.replyTopic("test.t")
.send[Array[Byte], Array[Byte]]("testWO".getBytes(), "tstBytesWO".getBytes()),
)

setUp(
scnRR.inject(atOnceUsers(1)).protocols(kafkaProtocolRRString),
scn.inject(nothingFor(1), atOnceUsers(1)).protocols(kafkaConf),
scnRR2.inject(atOnceUsers(1)).protocols(kafkaProtocolRRBytes),
scn2.inject(nothingFor(1), atOnceUsers(1)).protocols(kafkaConfBytes),
scnAvro4s.inject(atOnceUsers(1)).protocols(kafkaAvro4sConf),
scnRRwo.inject(atOnceUsers(1)).protocols(kafkaProtocolRRBytes),
).assertions(
global.failedRequests.percent.lt(15.0),
)

}
Loading

0 comments on commit 519e158

Please sign in to comment.