Skip to content

Commit

Permalink
make the topic field optional in the streaming KafkaOutput
Browse files Browse the repository at this point in the history
  • Loading branch information
marclamy committed Apr 10, 2024
1 parent e820302 commit 8134a06
Showing 1 changed file with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.util.Try
*/
case class KafkaOutput(
brokers: String,
topic: String,
topic: Option[String],
processingTimeTrigger: Trigger,
timeout: Long,
mode: String,
Expand All @@ -46,14 +46,15 @@ case class KafkaOutput(
val streamWriter = data.writeStream
.queryName(queryName)
.format("kafka")
.option("kafka.bootstrap.servers", brokers)
.option("topic", topic)
.options(options)
.option("kafka.bootstrap.servers", brokers)
.outputMode(mode)

val streamingQuery = streamWriter
.trigger(processingTimeTrigger)
.start()

val streamingQuery = topic match {
case Some(t) => streamWriter.option("topic", t).start()
case _ => streamWriter.start()
}

streamingQuery.awaitTermination(timeout)
streamingQuery.stop()
Expand Down Expand Up @@ -87,10 +88,7 @@ object KafkaOutput {
*/
def apply(implicit config: Config): KafkaOutput = {
val brokers = getBroker
val topic = getTopic match {
case Some(topicToUse) if topicToUse.nonEmpty => topicToUse
case _ => throw new IllegalArgumentException("No topic specified for Kafka source")
}
val topic = getTopic

val duration = Duration(config.getString("Duration"))
val processingTimeTrigger = Trigger.ProcessingTime(duration)
Expand Down

0 comments on commit 8134a06

Please sign in to comment.