From 89ffef74b1f00fb05c34fc94d50946b78b65ed94 Mon Sep 17 00:00:00 2001 From: baineng Date: Wed, 2 Jan 2019 22:19:36 +0800 Subject: [PATCH] add StoryTest --- .../thenetcircle/event_bus/story/Story.scala | 23 +++++--- .../event_bus/tasks/kafka/KafkaSource.scala | 41 +++++++------- .../event_bus/story/StoryTest.scala | 54 +++++++++++++++++++ 3 files changed, 90 insertions(+), 28 deletions(-) create mode 100644 core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala diff --git a/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala b/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala index 8fbe807..c0ea3a6 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala @@ -41,7 +41,7 @@ class Story( ) extends Logging with MonitoringHelp { - type Payload = (EventStatus, Event) // middle result type + import com.thenetcircle.event_bus.story.Story.Payload val storyName: String = settings.name @@ -53,7 +53,7 @@ class Story( private var runningFuture: Option[Future[Done]] = None def run()(implicit runningContext: TaskRunningContext): Future[Done] = runningFuture getOrElse { try { - val sourceHandler = wrapTask(Flow[Payload], s"story:$storyName:source", skipPreCheck = true) + val sourceHandler = Story.wrapTask(Flow[Payload], s"story:$storyName:source", fallbackTask, skipPreCheck = true) var transformId = 0 val transformsHandler = @@ -63,9 +63,10 @@ class Story( transformId += 1 _chain .via( - wrapTask( + Story.wrapTask( Flow[Payload].map(_._2).via(_transform.prepare()), - s"story:$storyName:transform:$transformId" + s"story:$storyName:transform:$transformId", + fallbackTask ) ) } @@ -73,7 +74,7 @@ class Story( .getOrElse(Flow[Payload]) val sinkHandler = - wrapTask(Flow[Payload].map(_._2).via(sinkTask.prepare()), s"story:$storyName:sink") + Story.wrapTask(Flow[Payload].map(_._2).via(sinkTask.prepare()), s"story:$storyName:sink", fallbackTask) val monitorFlow = Flow[Payload] .map(payload => { @@ -121,9 +122,16 @@ class Story( throw ex } +} + +object Story extends Logging { + + type Payload = (EventStatus, Event) // middle result type + def wrapTask( taskHandler: Flow[Payload, Payload, NotUsed], taskName: String, + fallbackTask: Option[FallbackTask] = None, skipPreCheck: Boolean = false )(implicit runningContext: TaskRunningContext): Flow[Payload, Payload, NotUsed] = Flow @@ -179,7 +187,7 @@ class Story( // format: off // --------------- workflow graph start ---------------- - + // NORM goes to taskHandler >>> preCheck.out(0) ~> finalTaskHandler ~> postCheck @@ -187,7 +195,7 @@ class Story( postCheck.out(0) ~> output.in(0) // TOFB goes to fallback >>> postCheck.out(1) ~> fallback ~> output.in(1) - + // Other status will skip this task >>> preCheck.out(1) ~> output.in(2) @@ -200,4 +208,5 @@ class Story( } ) .named(taskName) + } diff --git a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala index 638e468..7657acf 100644 --- a/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala +++ b/core/src/main/scala/com/thenetcircle/event_bus/tasks/kafka/KafkaSource.scala @@ -148,7 +148,7 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Log val kafkaConsumerSettings = getConsumerSettings() val kafkaSubscription = getSubscription() - consumerLogger.info(s"going to subscribe kafka topics: $kafkaSubscription") + consumerLogger.info(s"Going to subscribe kafka topics: $kafkaSubscription") val (killSwitch, doneFuture) = Consumer @@ -175,7 +175,7 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Log val errorMessage = s"The event ${event.uuid} missed PassThrough[CommittableOffset]" consumerLogger.error(errorMessage) - throw new IllegalArgumentException(errorMessage) + throw new IllegalStateException(errorMessage) } case (TOFB(exOp), event) => @@ -183,32 +183,31 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Log s"The event ${event.uuid} reaches the end with TOFB status" + exOp.map(e => s" and error ${e.getMessage}").getOrElse("") ) - val offsetOption = event.getPassThrough[CommittableOffset] - if (offsetOption.isDefined) { - consumerLogger.info( - s"The event ${event.uuid} is going to be committed with offset ${offsetOption.get}" - ) - throw new CommittableException(offsetOption.get, "Non handled TOFB status") - } else { - throw new RuntimeException( - "Non handled TOFB status without CommittableOffset" - ) + event.getPassThrough[CommittableOffset] match { + case Some(co) => + consumerLogger.info( + s"The event ${event.uuid} is going to be committed with offset $co" + ) + throw new CommittableException(co, "Non handled TOFB status") + case None => + throw new RuntimeException( + "Non handled TOFB status without CommittableOffset" + ) } case (FAIL(ex), event) => consumerLogger.error(s"The event ${event.uuid} reaches the end with error $ex") // complete the stream if failure, before was using Future.successful(Done) - val offsetOption = event.getPassThrough[CommittableOffset] - if (offsetOption.isDefined) { - consumerLogger.info( - s"The event ${event.uuid} is going to be committed with offset ${offsetOption.get}" - ) - throw new CommittableException(offsetOption.get, "FAIL status event") - } else { - throw ex + event.getPassThrough[CommittableOffset] match { + case Some(co) => + consumerLogger.info( + s"The event ${event.uuid} is going to be committed with offset $co" + ) + throw new CommittableException(co, "FAIL status event") + case None => + throw ex } } - // TODO some test .recover { case ex: CommittableException => consumerLogger.info( diff --git a/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala b/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala new file mode 100644 index 0000000..c12ce92 --- /dev/null +++ b/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala @@ -0,0 +1,54 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Contributors: + * Beineng Ma + */ + +package com.thenetcircle.event_bus.story + +import akka.NotUsed +import akka.stream.scaladsl.{Flow, Source} +import com.thenetcircle.event_bus.TestBase +import com.thenetcircle.event_bus.interfaces.EventStatus._ +import com.thenetcircle.event_bus.story.Story.Payload + +import scala.concurrent.{Await, Future} + +class StoryTest extends TestBase { + + behavior of "Story" + + it should "make sure the message order after wrapping tasks" in { + + val slowTask: Flow[Payload, Payload, NotUsed] = Flow[Payload] + .mapAsync(2) { pl => + Thread.sleep(1000) + Future.successful(pl) + } + val wrappedTask: Flow[Payload, Payload, NotUsed] = Story.wrapTask(slowTask, "testTask") + + val testSource: Source[Payload, NotUsed] = Source( + List( + (NORM, createTestEvent("norm_event1")), + (SKIP, createTestEvent("skip_event1")), + (NORM, createTestEvent("norm_event2")), + (SKIP, createTestEvent("skip_event2")) + ) + ) + + testSource.via(wrappedTask).runForeach(println) + + Thread.sleep(5000) + } +}