Skip to content

Commit

Permalink
add StoryTest
Browse files Browse the repository at this point in the history
  • Loading branch information
bennfocus committed Jan 2, 2019
1 parent bd67a75 commit 89ffef7
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 28 deletions.
23 changes: 16 additions & 7 deletions core/src/main/scala/com/thenetcircle/event_bus/story/Story.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class Story(
) extends Logging
with MonitoringHelp {

type Payload = (EventStatus, Event) // middle result type
import com.thenetcircle.event_bus.story.Story.Payload

val storyName: String = settings.name

Expand All @@ -53,7 +53,7 @@ class Story(
private var runningFuture: Option[Future[Done]] = None
def run()(implicit runningContext: TaskRunningContext): Future[Done] = runningFuture getOrElse {
try {
val sourceHandler = wrapTask(Flow[Payload], s"story:$storyName:source", skipPreCheck = true)
val sourceHandler = Story.wrapTask(Flow[Payload], s"story:$storyName:source", fallbackTask, skipPreCheck = true)

var transformId = 0
val transformsHandler =
Expand All @@ -63,17 +63,18 @@ class Story(
transformId += 1
_chain
.via(
wrapTask(
Story.wrapTask(
Flow[Payload].map(_._2).via(_transform.prepare()),
s"story:$storyName:transform:$transformId"
s"story:$storyName:transform:$transformId",
fallbackTask
)
)
}
})
.getOrElse(Flow[Payload])

val sinkHandler =
wrapTask(Flow[Payload].map(_._2).via(sinkTask.prepare()), s"story:$storyName:sink")
Story.wrapTask(Flow[Payload].map(_._2).via(sinkTask.prepare()), s"story:$storyName:sink", fallbackTask)

val monitorFlow = Flow[Payload]
.map(payload => {
Expand Down Expand Up @@ -121,9 +122,16 @@ class Story(
throw ex
}

}

object Story extends Logging {

type Payload = (EventStatus, Event) // middle result type

def wrapTask(
taskHandler: Flow[Payload, Payload, NotUsed],
taskName: String,
fallbackTask: Option[FallbackTask] = None,
skipPreCheck: Boolean = false
)(implicit runningContext: TaskRunningContext): Flow[Payload, Payload, NotUsed] =
Flow
Expand Down Expand Up @@ -179,15 +187,15 @@ class Story(

// format: off
// --------------- workflow graph start ----------------


// NORM goes to taskHandler >>>
preCheck.out(0) ~> finalTaskHandler ~> postCheck
// non-TOFB goes to next task
postCheck.out(0) ~> output.in(0)
// TOFB goes to fallback >>>
postCheck.out(1) ~> fallback ~> output.in(1)

// Other status will skip this task >>>
preCheck.out(1) ~> output.in(2)

Expand All @@ -200,4 +208,5 @@ class Story(
}
)
.named(taskName)

}
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Log

val kafkaConsumerSettings = getConsumerSettings()
val kafkaSubscription = getSubscription()
consumerLogger.info(s"going to subscribe kafka topics: $kafkaSubscription")
consumerLogger.info(s"Going to subscribe kafka topics: $kafkaSubscription")

val (killSwitch, doneFuture) =
Consumer
Expand All @@ -175,40 +175,39 @@ class KafkaSource(val settings: KafkaSourceSettings) extends SourceTask with Log
val errorMessage =
s"The event ${event.uuid} missed PassThrough[CommittableOffset]"
consumerLogger.error(errorMessage)
throw new IllegalArgumentException(errorMessage)
throw new IllegalStateException(errorMessage)
}

case (TOFB(exOp), event) =>
consumerLogger.error(
s"The event ${event.uuid} reaches the end with TOFB status" +
exOp.map(e => s" and error ${e.getMessage}").getOrElse("")
)
val offsetOption = event.getPassThrough[CommittableOffset]
if (offsetOption.isDefined) {
consumerLogger.info(
s"The event ${event.uuid} is going to be committed with offset ${offsetOption.get}"
)
throw new CommittableException(offsetOption.get, "Non handled TOFB status")
} else {
throw new RuntimeException(
"Non handled TOFB status without CommittableOffset"
)
event.getPassThrough[CommittableOffset] match {
case Some(co) =>
consumerLogger.info(
s"The event ${event.uuid} is going to be committed with offset $co"
)
throw new CommittableException(co, "Non handled TOFB status")
case None =>
throw new RuntimeException(
"Non handled TOFB status without CommittableOffset"
)
}

case (FAIL(ex), event) =>
consumerLogger.error(s"The event ${event.uuid} reaches the end with error $ex")
// complete the stream if failure, before was using Future.successful(Done)
val offsetOption = event.getPassThrough[CommittableOffset]
if (offsetOption.isDefined) {
consumerLogger.info(
s"The event ${event.uuid} is going to be committed with offset ${offsetOption.get}"
)
throw new CommittableException(offsetOption.get, "FAIL status event")
} else {
throw ex
event.getPassThrough[CommittableOffset] match {
case Some(co) =>
consumerLogger.info(
s"The event ${event.uuid} is going to be committed with offset $co"
)
throw new CommittableException(co, "FAIL status event")
case None =>
throw ex
}
}
// TODO some test
.recover {
case ex: CommittableException =>
consumerLogger.info(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Contributors:
* Beineng Ma <[email protected]>
*/

package com.thenetcircle.event_bus.story

import akka.NotUsed
import akka.stream.scaladsl.{Flow, Source}
import com.thenetcircle.event_bus.TestBase
import com.thenetcircle.event_bus.interfaces.EventStatus._
import com.thenetcircle.event_bus.story.Story.Payload

import scala.concurrent.{Await, Future}

class StoryTest extends TestBase {

behavior of "Story"

it should "make sure the message order after wrapping tasks" in {

val slowTask: Flow[Payload, Payload, NotUsed] = Flow[Payload]
.mapAsync(2) { pl =>
Thread.sleep(1000)
Future.successful(pl)
}
val wrappedTask: Flow[Payload, Payload, NotUsed] = Story.wrapTask(slowTask, "testTask")

val testSource: Source[Payload, NotUsed] = Source(
List(
(NORM, createTestEvent("norm_event1")),
(SKIP, createTestEvent("skip_event1")),
(NORM, createTestEvent("norm_event2")),
(SKIP, createTestEvent("skip_event2"))
)
)

testSource.via(wrappedTask).runForeach(println)

Thread.sleep(5000)
}
}

0 comments on commit 89ffef7

Please sign in to comment.