From 85ec76702a77307442abc7ad1e9f021090707a9f Mon Sep 17 00:00:00 2001 From: baineng Date: Thu, 3 Jan 2019 00:11:44 +0800 Subject: [PATCH] updated StoryTest --- .../event_bus/story/StoryTest.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala b/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala index c12ce92..9f479eb 100644 --- a/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala +++ b/core/src/test/scala/com/thenetcircle/event_bus/story/StoryTest.scala @@ -18,12 +18,14 @@ package com.thenetcircle.event_bus.story import akka.NotUsed +import akka.stream.Attributes import akka.stream.scaladsl.{Flow, Source} import com.thenetcircle.event_bus.TestBase import com.thenetcircle.event_bus.interfaces.EventStatus._ import com.thenetcircle.event_bus.story.Story.Payload import scala.concurrent.{Await, Future} +import scala.util.Random class StoryTest extends TestBase { @@ -33,17 +35,25 @@ class StoryTest extends TestBase { val slowTask: Flow[Payload, Payload, NotUsed] = Flow[Payload] .mapAsync(2) { pl => - Thread.sleep(1000) - Future.successful(pl) + Future { + Thread.sleep(Random.nextInt(1000)) + pl + } } - val wrappedTask: Flow[Payload, Payload, NotUsed] = Story.wrapTask(slowTask, "testTask") + + val slowTask2: Flow[Payload, Payload, NotUsed] = Flow[Payload].async + .addAttributes(Attributes.inputBuffer(1, 1)) + + val wrappedTask: Flow[Payload, Payload, NotUsed] = Story.wrapTask(slowTask2, "testTask") val testSource: Source[Payload, NotUsed] = Source( List( (NORM, createTestEvent("norm_event1")), (SKIP, createTestEvent("skip_event1")), (NORM, createTestEvent("norm_event2")), - (SKIP, createTestEvent("skip_event2")) + (SKIP, createTestEvent("skip_event2")), + (NORM, createTestEvent("norm_event3")), + (SKIP, createTestEvent("skip_event3")) ) )