diff --git a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala index b10be45..71cfb5c 100644 --- a/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala +++ b/core/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala @@ -22,7 +22,7 @@ import scala.util.Try */ case class KafkaOutput( brokers: String, - topic: String, + topic: Option[String], processingTimeTrigger: Trigger, timeout: Long, mode: String, @@ -46,14 +46,15 @@ case class KafkaOutput( val streamWriter = data.writeStream .queryName(queryName) .format("kafka") - .option("kafka.bootstrap.servers", brokers) - .option("topic", topic) .options(options) + .option("kafka.bootstrap.servers", brokers) .outputMode(mode) - - val streamingQuery = streamWriter .trigger(processingTimeTrigger) - .start() + + val streamingQuery = topic match { + case Some(t) => streamWriter.option("topic", t).start() + case _ => streamWriter.start() + } streamingQuery.awaitTermination(timeout) streamingQuery.stop() @@ -87,10 +88,7 @@ object KafkaOutput { */ def apply(implicit config: Config): KafkaOutput = { val brokers = getBroker - val topic = getTopic match { - case Some(topicToUse) if topicToUse.nonEmpty => topicToUse - case _ => throw new IllegalArgumentException("No topic specified for Kafka source") - } + val topic = getTopic val duration = Duration(config.getString("Duration")) val processingTimeTrigger = Trigger.ProcessingTime(duration)