From 56c824a39b1d73e67259bc73f6ac7fb5cc1997f6 Mon Sep 17 00:00:00 2001 From: Jacek Kunicki Date: Fri, 6 Oct 2023 15:21:17 +0200 Subject: [PATCH] Extract some Source test cases to separate classes --- .../SourceOpsFactoryMethodsTest.scala | 40 ++++ .../ox/channels/SourceOpsForeachTest.scala | 48 +++++ .../scala/ox/channels/SourceOpsMapTest.scala | 69 +++++++ .../scala/ox/channels/SourceOpsTest.scala | 185 ------------------ .../ox/channels/SourceOpsTransformTest.scala | 69 +++++++ 5 files changed, 226 insertions(+), 185 deletions(-) create mode 100644 core/src/test/scala/ox/channels/SourceOpsFactoryMethodsTest.scala create mode 100644 core/src/test/scala/ox/channels/SourceOpsForeachTest.scala create mode 100644 core/src/test/scala/ox/channels/SourceOpsMapTest.scala create mode 100644 core/src/test/scala/ox/channels/SourceOpsTransformTest.scala diff --git a/core/src/test/scala/ox/channels/SourceOpsFactoryMethodsTest.scala b/core/src/test/scala/ox/channels/SourceOpsFactoryMethodsTest.scala new file mode 100644 index 00000000..ceae0ec1 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsFactoryMethodsTest.scala @@ -0,0 +1,40 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsFactoryMethodsTest extends AnyFlatSpec with Matchers { + + behavior of "Source factory methods" + + it should "create a source from a fork" in { + scoped { + val f = fork(1) + val c = Source.fromFork(f) + c.toList shouldBe List(1) + } + } + + it should "create an iterating source" in { + scoped { + val c = Source.iterate(1)(_ + 1) + c.take(3).toList shouldBe List(1, 2, 3) + } + } + + it should "unfold a function" in { + scoped { + val c = Source.unfold(0)(i => if i < 3 then Some((i, i + 1)) else None) + c.toList shouldBe List(0, 1, 2) + } + } + + it should "produce a range" in { + scoped { + Source.range(1, 5, 1).toList shouldBe List(1, 2, 3, 4, 5) + Source.range(1, 5, 2).toList shouldBe List(1, 3, 5) + Source.range(1, 11, 3).toList shouldBe List(1, 4, 7, 10) + } + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsForeachTest.scala b/core/src/test/scala/ox/channels/SourceOpsForeachTest.scala new file mode 100644 index 00000000..0d7d8c73 --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsForeachTest.scala @@ -0,0 +1,48 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsForeachTest extends AnyFlatSpec with Matchers { + + behavior of "Source.foreach" + + it should "iterate over a source" in { + val c = Channel[Int](10) + c.send(1) + c.send(2) + c.send(3) + c.done() + + var r: List[Int] = Nil + c.foreach(v => r = v :: r) + + r shouldBe List(3, 2, 1) + } + + it should "iterate over a source using for-syntax" in { + val c = Channel[Int](10) + c.send(1) + c.send(2) + c.send(3) + c.done() + + var r: List[Int] = Nil + for { + v <- c + } r = v :: r + + r shouldBe List(3, 2, 1) + } + + it should "convert source to a list" in { + val c = Channel[Int](10) + c.send(1) + c.send(2) + c.send(3) + c.done() + + c.toList shouldBe List(1, 2, 3) + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsMapTest.scala b/core/src/test/scala/ox/channels/SourceOpsMapTest.scala new file mode 100644 index 00000000..3797f62b --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsMapTest.scala @@ -0,0 +1,69 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsMapTest extends AnyFlatSpec with Matchers { + + behavior of "Source.map" + + it should "map over a source" in { + scoped { + val c = Channel[Int]() + fork { + c.send(1) + c.send(2) + c.send(3) + c.done() + } + + val s = c.map(_ * 10) + + s.receive() shouldBe 10 + s.receive() shouldBe 20 + s.receive() shouldBe 30 + s.receive() shouldBe ChannelClosed.Done + } + } + + it should "map over a source (stress test)" in { + // this demonstrated a race condition where a cell was added by select to the waiting list by T1, completed by T2, + // which then subsequently completed the stream; only then T1 wakes up, and checks if no new elements have been added + for (_ <- 1 to 100000) { + scoped { + val c = Channel[Int]() + fork { + c.send(1) + c.done() + } + + val s = c.map(_ * 10) + + s.receive() shouldBe 10 + s.receive() shouldBe ChannelClosed.Done + } + } + } + + it should "map over a source using for-syntax" in { + scoped { + val c = Channel[Int]() + fork { + c.send(1) + c.send(2) + c.send(3) + c.done() + } + + val s = for { + v <- c + } yield v * 2 + + s.receive() shouldBe 2 + s.receive() shouldBe 4 + s.receive() shouldBe 6 + s.receive() shouldBe ChannelClosed.Done + } + } +} diff --git a/core/src/test/scala/ox/channels/SourceOpsTest.scala b/core/src/test/scala/ox/channels/SourceOpsTest.scala index e48d1b6f..1513c31e 100644 --- a/core/src/test/scala/ox/channels/SourceOpsTest.scala +++ b/core/src/test/scala/ox/channels/SourceOpsTest.scala @@ -11,161 +11,6 @@ import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters.* class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually { - it should "map over a source" in { - scoped { - val c = Channel[Int]() - fork { - c.send(1) - c.send(2) - c.send(3) - c.done() - } - - val s = c.map(_ * 10) - - s.receive() shouldBe 10 - s.receive() shouldBe 20 - s.receive() shouldBe 30 - s.receive() shouldBe ChannelClosed.Done - } - } - - it should "map over a source (stress test)" in { - // this demonstrated a race condition where a cell was added by select to the waiting list by T1, completed by T2, - // which then subsequently completed the stream; only then T1 wakes up, and checks if no new elements have been added - for (_ <- 1 to 100000) { - scoped { - val c = Channel[Int]() - fork { - c.send(1) - c.done() - } - - val s = c.map(_ * 10) - - s.receive() shouldBe 10 - s.receive() shouldBe ChannelClosed.Done - } - } - } - - it should "map over a source using for-syntax" in { - scoped { - val c = Channel[Int]() - fork { - c.send(1) - c.send(2) - c.send(3) - c.done() - } - - val s = for { - v <- c - } yield v * 2 - - s.receive() shouldBe 2 - s.receive() shouldBe 4 - s.receive() shouldBe 6 - s.receive() shouldBe ChannelClosed.Done - } - } - - it should "iterate over a source" in { - val c = Channel[Int](10) - c.send(1) - c.send(2) - c.send(3) - c.done() - - var r: List[Int] = Nil - c.foreach(v => r = v :: r) - - r shouldBe List(3, 2, 1) - } - - it should "iterate over a source using for-syntax" in { - val c = Channel[Int](10) - c.send(1) - c.send(2) - c.send(3) - c.done() - - var r: List[Int] = Nil - for { - v <- c - } r = v :: r - - r shouldBe List(3, 2, 1) - } - - it should "convert source to a list" in { - val c = Channel[Int](10) - c.send(1) - c.send(2) - c.send(3) - c.done() - - c.toList shouldBe List(1, 2, 3) - } - - it should "transform a source using a simple map" in { - val c = Channel[Int](10) - c.send(1) - c.send(2) - c.send(3) - c.done() - - scoped { - c.transform(_.map(_ * 2)).toList shouldBe List(2, 4, 6) - } - } - - it should "transform a source using a complex chain of operations" in { - val c = Channel[Int](10) - c.send(1) - c.send(2) - c.send(3) - c.send(4) - c.done() - - scoped { - c.transform(_.drop(2).flatMap(i => List(i, i + 1, i + 2)).filter(_ % 2 == 0)).toList shouldBe List(4, 4, 6) - } - } - - it should "transform an infinite source" in { - val c = Channel[Int]() - scoped { - fork { - var i = 0 - while true do - c.send(i) - i += 1 - } - - val s = c.transform(_.filter(_ % 2 == 0).flatMap(i => List(i, i + 1))) - s.receive() shouldBe 0 - s.receive() shouldBe 1 - s.receive() shouldBe 2 - } - } - - it should "transform an infinite source (stress test)" in { - for (_ <- 1 to 1000) { // this nicely demonstrated two race conditions - val c = Channel[Int]() - scoped { - fork { - var i = 0 - while true do - c.send(i) - i += 1 - } - - val s = c.transform(x => x) - s.receive() shouldBe 0 - } - } - } it should "tick regularly" in { scoped { @@ -231,28 +76,6 @@ class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually { } } - it should "create a source from a fork" in { - scoped { - val f = fork(1) - val c = Source.fromFork(f) - c.toList shouldBe List(1) - } - } - - it should "create an iterating source" in { - scoped { - val c = Source.iterate(1)(_ + 1) - c.take(3).toList shouldBe List(1, 2, 3) - } - } - - it should "unfold a function" in { - scoped { - val c = Source.unfold(0)(i => if i < 3 then Some((i, i + 1)) else None) - c.toList shouldBe List(0, 1, 2) - } - } - it should "concatenate sources" in { scoped { val s1 = Source.fromValues("a", "b", "c") @@ -264,12 +87,4 @@ class SourceOpsTest extends AnyFlatSpec with Matchers with Eventually { s.toList shouldBe List("a", "b", "c", "d", "e", "f", "g", "h", "i") } } - - it should "produce a range" in { - scoped { - Source.range(1, 5, 1).toList shouldBe List(1, 2, 3, 4, 5) - Source.range(1, 5, 2).toList shouldBe List(1, 3, 5) - Source.range(1, 11, 3).toList shouldBe List(1, 4, 7, 10) - } - } } diff --git a/core/src/test/scala/ox/channels/SourceOpsTransformTest.scala b/core/src/test/scala/ox/channels/SourceOpsTransformTest.scala new file mode 100644 index 00000000..080ea89b --- /dev/null +++ b/core/src/test/scala/ox/channels/SourceOpsTransformTest.scala @@ -0,0 +1,69 @@ +package ox.channels + +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import ox.* + +class SourceOpsTransformTest extends AnyFlatSpec with Matchers { + + behavior of "Source.transform" + + it should "transform a source using a simple map" in { + val c = Channel[Int](10) + c.send(1) + c.send(2) + c.send(3) + c.done() + + scoped { + c.transform(_.map(_ * 2)).toList shouldBe List(2, 4, 6) + } + } + + it should "transform a source using a complex chain of operations" in { + val c = Channel[Int](10) + c.send(1) + c.send(2) + c.send(3) + c.send(4) + c.done() + + scoped { + c.transform(_.drop(2).flatMap(i => List(i, i + 1, i + 2)).filter(_ % 2 == 0)).toList shouldBe List(4, 4, 6) + } + } + + it should "transform an infinite source" in { + val c = Channel[Int]() + scoped { + fork { + var i = 0 + while true do + c.send(i) + i += 1 + } + + val s = c.transform(_.filter(_ % 2 == 0).flatMap(i => List(i, i + 1))) + s.receive() shouldBe 0 + s.receive() shouldBe 1 + s.receive() shouldBe 2 + } + } + + it should "transform an infinite source (stress test)" in { + for (_ <- 1 to 1000) { // this nicely demonstrated two race conditions + val c = Channel[Int]() + scoped { + fork { + var i = 0 + while true do + c.send(i) + i += 1 + } + + val s = c.transform(x => x) + s.receive() shouldBe 0 + } + } + } +}