diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala index 9e91894..64aa2ae 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/action/KafkaRequestAction.scala @@ -55,11 +55,16 @@ class KafkaRequestAction[K,V]( val producer: KafkaProducer[K,V], kafkaAttributes payload session map { payload => + val topic = kafkaAttributes.topic match { + case Some(k) => k(session).get + case None => kafkaProtocol.topic + } + val record = kafkaAttributes.key match { case Some(k) => - new ProducerRecord[K, V](kafkaProtocol.topic, k(session).get, payload) + new ProducerRecord[K, V](topic, k(session).get, payload) case None => - new ProducerRecord[K, V](kafkaProtocol.topic, payload) + new ProducerRecord[K, V](topic, payload) } val requestStartDate = nowMillis diff --git a/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala b/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala index d216265..68e7557 100644 --- a/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala +++ b/src/main/scala/com/github/mnogu/gatling/kafka/request/builder/KafkaRequestBuilder.scala @@ -5,15 +5,19 @@ import io.gatling.core.session._ case class KafkaAttributes[K,V]( requestName: Expression[String], key: Option[Expression[K]], - payload: Expression[V] ) + payload: Expression[V], + topic: Option[Expression[String]]) case class KafkaRequestBuilder(requestName: Expression[String]) { - def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None) + def send[V](payload: Expression[V]): KafkaRequestActionBuilder[_,V] = send(payload, None, None) - def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key)) + def send[K,V](key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = send(payload, Some(key), None) - private def send[K,V](payload: Expression[V], key: Option[Expression[K]]) = - new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload)) + def send[K,V](topic: Expression[String], key: Expression[K], payload: Expression[V]): KafkaRequestActionBuilder[K,V] = + send(payload, Some(key), Some(topic)) + + private def send[K,V](payload: Expression[V], key: Option[Expression[K]], topic: Option[Expression[String]]) = + new KafkaRequestActionBuilder(KafkaAttributes(requestName, key, payload, topic)) }