Skip to content

Commit

Permalink
feat: java and kotlin support (#125)
Browse files Browse the repository at this point in the history
* feat: java kafka protocol

---------

Co-authored-by: a.ugodnikov <[email protected]>
  • Loading branch information
daylikon and a.ugodnikov authored Feb 2, 2023
1 parent f809eac commit 3349aaf
Show file tree
Hide file tree
Showing 25 changed files with 840 additions and 7 deletions.
36 changes: 32 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,51 @@

# Introduction

Plugin to support Kafka in Gatling(3.7.x)
Plugin to support Kafka in Gatling (3.9.x)

# Usage

### Getting Started

Plugin is currently available for Scala 2.13.
Plugin is currently available for Scala 2.13, Java 17, Kotlin.

You may include plugin as dependency in project with your tests. Write
You may include plugin as dependency in project with your tests. Write

### Scala

```scala
libraryDependencies += "ru.tinkoff" %% "gatling-kafka-plugin" % <version> % Test
```

### Java

Write this to your dependencies block in build.gradle:

```java
gatling "ru.tinkoff:gatling-kafka-plugin_2.13:<version>"
```

### Kotlin

Write this to your dependencies block in build.gradle:

```kotlin
gatling("ru.tinkoff:gatling-kafka-plugin_2.13:<version>")
```

## Example Scenarios

Examples [here](https://github.com/TinkoffCreditSystems/gatling-kafka-plugin/tree/master/src/test/scala/ru/tinkoff/gatling/kafka/examples)
### Scala

Examples [here](src/test/scala/ru/tinkoff/gatling/kafka/examples)

### Java

Examples [here](src/test/java/ru/tinkoff/gatling/kafka/javaapi/examples)

### Kotlin

Examples [here](src/test/kotlin/ru/tinkoff/gatling/kafka/javaapi/examples)

## Download and create Avro schema

Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ lazy val root = (project in file("."))
// schemaRegistryUrl := "http://test-schema-registry:8081",
resolvers ++= Seq(
"Confluent" at "https://packages.confluent.io/maven/",
Resolver.sonatypeRepo("public"),
),
resolvers ++= Resolver.sonatypeOssRepos("public"),
scalacOptions ++= Seq(
"-encoding",
"UTF-8", // Option and arguments on same line
Expand Down
5 changes: 3 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ import sbt._

object Dependencies {
private object Versions {
val kafka = "7.3.1-ce"
// TODO: 7.3.1-ce (kafka 3.3) throw exception on test startup https://issues.apache.org/jira/browse/KAFKA-14270, will be fixed in kafka 3.4
val kafka = "7.3.0-ce"
val gatling = "3.9.0"
val avro4s = "4.1.0"
val avro = "1.11.1"
Expand All @@ -25,6 +26,6 @@ object Dependencies {
lazy val avroCompiler: ModuleID = "org.apache.avro" % "avro-compiler" % Versions.avro
lazy val avroCore: ModuleID = "org.apache.avro" % "avro" % Versions.avro
lazy val avroSerdes: ModuleID =
("io.confluent" % "kafka-streams-avro-serde" % "7.3.1").exclude("org.apache.kafka", "kafka-streams-scala")
("io.confluent" % "kafka-streams-avro-serde" % "7.3.0").exclude("org.apache.kafka", "kafka-streams-scala")

}
31 changes: 31 additions & 0 deletions src/main/java/ru/tinkoff/gatling/kafka/javaapi/KafkaDsl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package ru.tinkoff.gatling.kafka.javaapi;

import static io.gatling.javaapi.core.internal.Expressions.*;

import io.gatling.core.check.CheckBuilder;
import org.apache.avro.generic.GenericRecord;
import ru.tinkoff.gatling.kafka.javaapi.checks.KafkaChecks;
import ru.tinkoff.gatling.kafka.javaapi.protocol.*;
import ru.tinkoff.gatling.kafka.javaapi.request.builder.*;
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage;
import scala.Function1;

public final class KafkaDsl {

public static KafkaProtocolBuilderBase kafka() {
return new KafkaProtocolBuilderBase();
}

public static KafkaRequestBuilderBase kafka(String requestName) {
return new KafkaRequestBuilderBase(ru.tinkoff.gatling.kafka.Predef.kafka(toStringExpression(requestName)), requestName);
}

public static KafkaChecks.KafkaCheckTypeWrapper simpleCheck(Function1<KafkaProtocolMessage, Boolean> f) {
return new KafkaChecks.KafkaCheckTypeWrapper(new KafkaChecks.SimpleChecksScala().simpleCheck(f.andThen(Boolean::valueOf)));
}

public static CheckBuilder.Find<Object, KafkaProtocolMessage, GenericRecord> avroBody() {
return new KafkaChecks.SimpleChecksScala().avroBody(ru.tinkoff.gatling.kafka.javaapi.checks.KafkaChecks.avroSerde());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ru.tinkoff.gatling.kafka.javaapi.checks;

import io.gatling.javaapi.core.CheckBuilder.CheckType;

public enum KafkaCheckType implements CheckType {
ResponseCode
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package ru.tinkoff.gatling.kafka.javaapi.checks

import io.gatling.core.check.Check
import io.gatling.core.check._
import io.gatling.core.check.bytes.BodyBytesCheckType
import io.gatling.core.check.jmespath.JmesPathCheckType
import io.gatling.core.check.jsonpath.JsonPathCheckType
import io.gatling.core.check.string.BodyStringCheckType
import io.gatling.core.check.substring.SubstringCheckType
import io.gatling.core.check.xpath.XPathCheckType
import io.gatling.javaapi.core.internal.CoreCheckType
import ru.tinkoff.gatling.kafka.checks.{KafkaCheckMaterializer, KafkaCheckSupport}
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage
import ru.tinkoff.gatling.kafka.{KafkaCheck, checks}
import net.sf.saxon.s9api.XdmNode
import com.fasterxml.jackson.databind.JsonNode
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
import org.apache.avro.generic.GenericRecord
import org.apache.kafka.common.serialization.Serde

import java.{util => ju}
import scala.jdk.CollectionConverters._

object KafkaChecks {
case class KafkaCheckTypeWrapper(value: Check[KafkaProtocolMessage])
class SimpleChecksScala extends KafkaCheckSupport {}

val avroSerde: Serde[GenericRecord] = new GenericAvroSerde()

private def toScalaCheck(javaCheck: Object): KafkaCheck = {
javaCheck match {
case _: io.gatling.javaapi.core.CheckBuilder =>
val checkBuilder = javaCheck.asInstanceOf[io.gatling.javaapi.core.CheckBuilder]
val scalaCheck = checkBuilder.asScala
checkBuilder.`type` match {
case CoreCheckType.BodyBytes =>
scalaCheck
.asInstanceOf[CheckBuilder[BodyBytesCheckType, Array[Byte]]]
.build(KafkaCheckMaterializer.bodyBytes)
case CoreCheckType.BodyString =>
scalaCheck
.asInstanceOf[CheckBuilder[BodyStringCheckType, String]]
.build(KafkaCheckMaterializer.bodyString(io.gatling.core.Predef.configuration))
case CoreCheckType.Substring =>
scalaCheck
.asInstanceOf[CheckBuilder[SubstringCheckType, String]]
.build(KafkaCheckMaterializer.substring(io.gatling.core.Predef.configuration))
case CoreCheckType.XPath =>
scalaCheck
.asInstanceOf[CheckBuilder[XPathCheckType, XdmNode]]
.build(KafkaCheckMaterializer.xpath(io.gatling.core.Predef.configuration))
case CoreCheckType.JsonPath =>
scalaCheck
.asInstanceOf[CheckBuilder[JsonPathCheckType, JsonNode]]
.build(
KafkaCheckMaterializer.jsonPath(io.gatling.core.Predef.defaultJsonParsers, io.gatling.core.Predef.configuration),
)
case CoreCheckType.JmesPath =>
scalaCheck
.asInstanceOf[CheckBuilder[JmesPathCheckType, JsonNode]]
.build(
KafkaCheckMaterializer.jmesPath(io.gatling.core.Predef.defaultJsonParsers, io.gatling.core.Predef.configuration),
)
case KafkaCheckType.ResponseCode =>
scalaCheck
.asInstanceOf[CheckBuilder[checks.KafkaCheckMaterializer.KafkaMessageCheckType, KafkaProtocolMessage]]
.build(
KafkaCheckMaterializer.kafkaStatusCheck,
)
case unknown => throw new IllegalArgumentException(s"Kafka DSL doesn't support $unknown")
}
}
}

def toScalaChecks(javaChecks: ju.List[Object]): Seq[KafkaCheck] =
javaChecks.asScala.map(toScalaCheck).toSeq
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import java.time.Duration;
import java.util.Map;

import static scala.jdk.javaapi.CollectionConverters.asScala;
import scala.jdk.javaapi.DurationConverters;

public class KPConsumeSettingsStep {

private final Map<String, Object> producerSettings;
private final Map<String, Object> consumeSettings;

public KPConsumeSettingsStep(Map<String, Object> producerSettings, Map<String, Object> consumeSettings) {
this.producerSettings = producerSettings;
this.consumeSettings = consumeSettings;
}

public KafkaProtocolBuilderNew timeout(Duration timeout) {
scala.collection.immutable.Map<String, Object> ps = scala.collection.immutable.Map.from(asScala(this.producerSettings));
scala.collection.immutable.Map<String, Object> cs = scala.collection.immutable.Map.from(asScala(this.consumeSettings));
return new KafkaProtocolBuilderNew(ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilderNew.apply(ps, cs, DurationConverters.toScala(timeout), ru.tinkoff.gatling.kafka.protocol.KafkaProtocol.KafkaKeyMatcher$.MODULE$));
}

public KafkaProtocolBuilderNew withDefaultTimeout() {
scala.collection.immutable.Map<String, Object> ps = scala.collection.immutable.Map.from(asScala(this.producerSettings));
scala.collection.immutable.Map<String, Object> cs = scala.collection.immutable.Map.from(asScala(this.consumeSettings));
return new KafkaProtocolBuilderNew(ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilderNew.apply(ps, cs, DurationConverters.toScala(Duration.ofSeconds(60)), ru.tinkoff.gatling.kafka.protocol.KafkaProtocol.KafkaKeyMatcher$.MODULE$));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import java.util.Map;

public class KPProducerSettingsStep {

private final Map<String, Object> ps;

public KPProducerSettingsStep(Map<String, Object> ps) {
this.ps = ps;
}

public KPConsumeSettingsStep consumeSettings(Map<String, Object> cs) {
return new KPConsumeSettingsStep(ps, cs);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import io.gatling.core.protocol.Protocol;
import io.gatling.javaapi.core.ProtocolBuilder;

public class KafkaProtocolBuilder implements ProtocolBuilder {

private final ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilder wrapped;

public KafkaProtocolBuilder(ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilder wrapped) {
this.wrapped = wrapped;
}

@Override
public Protocol protocol() {
return wrapped.build();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import java.util.Collections;

public class KafkaProtocolBuilderBase {

public KafkaProtocolBuilderPropertiesStep topic(String name) {
return new KafkaProtocolBuilderPropertiesStep(name, Collections.emptyMap());
}

public KafkaProtocolBuilderNewBase requestReply() {
return new KafkaProtocolBuilderNewBase();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import io.gatling.core.protocol.Protocol;
import io.gatling.javaapi.core.ProtocolBuilder;
import ru.tinkoff.gatling.kafka.request.KafkaProtocolMessage;
import scala.Function1;

public class KafkaProtocolBuilderNew implements ProtocolBuilder {

private ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilderNew wrapped;

public KafkaProtocolBuilderNew(ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilderNew wrapped) {
this.wrapped = wrapped;
}

public KafkaProtocolBuilderNew matchByValue() {
this.wrapped = wrapped.matchByValue();
return this;
}

public KafkaProtocolBuilderNew matchByMessage(Function1<KafkaProtocolMessage, byte[]> keyExtractor) {
this.wrapped = wrapped.matchByMessage(keyExtractor);
return this;
}

@Override
public Protocol protocol() {
return wrapped.build();
}

}


Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import java.util.Map;

public class KafkaProtocolBuilderNewBase {

public KPProducerSettingsStep producerSettings(Map<String, Object> ps) {
return new KPProducerSettingsStep(ps);
}

}

Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package ru.tinkoff.gatling.kafka.javaapi.protocol;

import java.util.Map;
import static scala.jdk.javaapi.CollectionConverters.asScala;

public class KafkaProtocolBuilderPropertiesStep {

private final String topic;
private Map<String, Object> props;

public KafkaProtocolBuilderPropertiesStep(String topic, Map<String, Object> props){
this.topic = topic;
this.props = props;
}

public KafkaProtocolBuilder properties(Map<String, Object> props) {
this.props = props;
scala.collection.immutable.Map<String, Object> scalaMap = scala.collection.immutable.Map.from(asScala(this.props));
return new KafkaProtocolBuilder(
ru.tinkoff.gatling.kafka.protocol.KafkaProtocolBuilderPropertiesStep.apply(this.topic, scalaMap)
.properties(scalaMap)
);
}


}
Loading

0 comments on commit 3349aaf

Please sign in to comment.