Skip to content

Support Avro classes in ReqRep #102

Open
neoscaler opened this issue Nov 3, 2022 · 1 comment
Open

Support Avro classes in ReqRep #102

neoscaler opened this issue Nov 3, 2022 · 1 comment

Comments

@neoscaler
Copy link

In the default Kafka.send method it is possible to use Avro Java classes (generated by avro-maven-plugin) as payload in key or value. The payload then gets serialized as valid Avro bytestream.

But in the new ReqRepBases (used in the Request/Reply mechanism) send method this does not seem possible. We are seeing an error like [No implicits found for parameter evidence].

Is Avro not supported here? Or are we simply using it wrong?

@3alster
Copy link

3alster commented Feb 6, 2023

It is possible, but not straight out of the box.
All the credit goes to @daylikon

import com.sksamuel.avro4s._
import org.apache.kafka.clients.producer.ProducerConfig
import ru.tinkoff.gatling.kafka.Predef._
import ru.tinkoff.gatling.kafka.protocol.KafkaProtocol
import path.AvroClass // here goes the path to the generated avro class

// define Serde for Avro-class
val ser =
  new KafkaAvroSerializer(
    new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),
  )

val de =
  new KafkaAvroDeserializer(
    new CachedSchemaRegistryClient("url".split(',').toList.asJava, 16),
  )

implicit val serdeClass: Serde[AvroClass] = new Serde[AvroClass] {
  override def serializer(): Serializer[AvroClass] = ser.asInstanceOf[Serializer[AvroClass]]

  override def deserializer(): Deserializer[AvroClass] = de.asInstanceOf[Deserializer[AvroClass]]
}

// Protocol description
val kafkaScn1Protocol: KafkaProtocol = kafka
  .topic("myTopic")
  .properties(
    Map(
      ProducerConfig.ACKS_CONFIG                   -> "1",
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG      -> "localhost:9092",
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringSerializer",
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
      "schema.registry.url"                        -> "url",
    ),
  )

val kafkaScn2Protocol: KafkaProtocol = kafka.requestReply
  .producerSettings(
    Map(
      ProducerConfig.ACKS_CONFIG                   -> "1",
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG      -> "localhost:9092",
      ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG   -> "org.apache.kafka.common.serialization.StringSerializer",
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> "io.confluent.kafka.serializers.KafkaAvroSerializer",
      "schema.registry.url"                        -> "url",
    ),
  )
  .consumeSettings(
    Map(
      ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
      "schema.registry.url"                   -> "url",
    ),
  )
  .timeout(5.seconds)

// Sending Avro-serialised messages scenario
val scn: ScenarioBuilder = scenario("Simple request")
  .exec(
    kafka("Simple Request")
      .send[AvroClass](new AvroClass("someParam1", "someParam2")),
  )

val scn2: ScenarioBuilder = scenario("RequestReply")
  .exec(
    kafka("RequestReply").requestReply
      .requestTopic("request.t")
      .replyTopic("reply.t")
      .send[String, AvroClass]("key", new AvroClass("someParam1", "someParam2")),
  )

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants