Skip to content

Commit

Permalink
update the streaming KafkaOutput createQueryName function and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
marclamy committed Apr 10, 2024
1 parent 8134a06 commit 00db964
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,11 @@ case class KafkaOutput(
*/
private[streaming] def createQueryName(): String = {

outputName match {
case Some(name) => s"QN_${name}_${topic}_${java.util.UUID.randomUUID}"
case _ => s"QN_${topic}_${java.util.UUID.randomUUID}"
(outputName, topic) match {
case (Some(name), Some(t)) => s"QN_${name}_${t}_${java.util.UUID.randomUUID}"
case (Some(name), None) => s"QN_${name}_${java.util.UUID.randomUUID}"
case (None, Some(t)) => s"QN_KafkaOutput_${t}_${java.util.UUID.randomUUID}"
case _ => s"QN_KafkaOutput_${java.util.UUID.randomUUID}"
}

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual Some("my-test-kafka-output")
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
}

"be initialized with all optional properties" in {
Expand Down Expand Up @@ -56,7 +56,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

kafkaStreamOutput.outputName shouldEqual None
kafkaStreamOutput.brokers shouldEqual "bktv001:9000, bktv002.amadeus.net:8000"
kafkaStreamOutput.topic shouldEqual "test.topic"
kafkaStreamOutput.topic shouldEqual Some("test.topic")
kafkaStreamOutput.options shouldEqual Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
Expand All @@ -65,60 +65,6 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
)
}

"throw an exception given missing topic but pattern" in {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput",
"Name" -> "my-test-kafka",
"Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000",
"Pattern" -> "test.pattern",
"Mode" -> "append",
"Duration" -> "60 seconds",
"Timeout" -> "24",
"Options" -> Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
"\"kafka.security.protocol\"" -> "SASL_PLAINTEXT",
"\"kafka.sasl.kerberos.service.name\"" -> "kafka"
)
)
)
)

intercept[IllegalArgumentException] {
KafkaOutput(config.getConfig("Output"))
}
}

"throw an exception given missing topic but assign" in {

val config = ConfigFactory.parseMap(
Map(
"Output" -> Map(
"Type" -> "com.amadeus.dataio.output.streaming.KafkaOutput",
"Name" -> "my-test-kafka",
"Brokers" -> "bktv001:9000, bktv002.amadeus.net:8000",
"Assign" -> "test.assign",
"Mode" -> "append",
"Duration" -> "60 seconds",
"Timeout" -> "24",
"Options" -> Map(
"failOnDataLoss" -> "false",
"maxOffsetsPerTrigger" -> "20000000",
"\"kafka.security.protocol\"" -> "SASL_PLAINTEXT",
"\"kafka.sasl.kerberos.service.name\"" -> "kafka"
)
)
)
)

intercept[IllegalArgumentException] {
KafkaOutput(config.getConfig("Output"))
}
}

"throw an exception given missing brokers" in {

val config = ConfigFactory.parseMap(
Expand Down Expand Up @@ -155,7 +101,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = "test.topic",
topic = Some("test.topic"),
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
Expand All @@ -164,7 +110,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_test.topic_" + uuidPattern + "$"
queryName should fullyMatch regex "^QN_KafkaOutput_test.topic_" + uuidPattern + "$"

}

Expand All @@ -173,7 +119,7 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = "test.topic",
topic = Some("test.topic"),
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
Expand All @@ -185,5 +131,41 @@ class KafkaOutputTest extends AnyWordSpec with Matchers {
queryName should fullyMatch regex "^QN_myTestOutput_test.topic_" + uuidPattern + "$"

}

"return a query name based on output name" in {

val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = None,
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
outputName = Some("myTestOutput")
)

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_myTestOutput_" + uuidPattern + "$"

}

"return a query name based without name or topic" in {

val kafkaOutput =
KafkaOutput(
brokers = "bktv001:9000, bktv002.amadeus.net:8000",
topic = None,
processingTimeTrigger = Trigger.Once(),
timeout = 0,
mode = "append",
outputName = None
)

val queryName = kafkaOutput.createQueryName()

queryName should fullyMatch regex "^QN_KafkaOutput_" + uuidPattern + "$"

}
}
}

0 comments on commit 00db964

Please sign in to comment.