Skip to content

Commit

Permalink
feat: introduce intersperse function
Browse files Browse the repository at this point in the history
The intersperse function behaves as `List.mkString` e.g.:
* called with an `inject` element only it produces

  Source.fromValues("f", "b").intersperse(", ")  // ("f", ", ", "b")

* called with `start`, `inject` and `end` it produces

  Source.fromValues("f", "b").intersperse("[", ", ", "]")  // ("[", "f", ", ", "b", "]")
  • Loading branch information
geminicaprograms committed Oct 17, 2023
1 parent 161557a commit a4de618
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 1 deletion.
63 changes: 62 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ox.channels

import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -34,6 +34,67 @@ trait SourceOps[+T] { this: Source[T] =>
}
c2

/** Intersperses this source with provided element and forwards it to the returned channel.
*
* @param inject
* An element to be injected between the stream elements.
* @return
* A source, onto which elements will be injected.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[String].intersperse(", ").toList // List()
* Source.fromValues("foo").intersperse(", ").toList // List(foo)
* Source.fromValues("foo", "bar").intersperse(", ").toList // List(foo, ", ", bar)
* }
* }}}
*/
def intersperse[U >: T](inject: U)(using Ox, StageCapacity): Source[U] =
intersperse(None, inject, None)

/** Intersperses this source with start, end and provided elements and forwards it to the returned channel.
*
* @param start
* An element to be prepended to the stream.
* @param inject
* An element to be injected between the stream elements.
* @param end
* An element to be appended to the end of the stream.
* @return
* A source, onto which elements will be injected.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[String].intersperse("[", ", ", "]").toList // List([, ])
* Source.fromValues("foo").intersperse("[", ", ", "]").toList // List([, foo, ])
* Source.fromValues("foo", "bar").intersperse("[", ", ", "]").toList // List([, foo, ", ", bar, ])
* }
* }}}
*/
def intersperse[U >: T](start: U, inject: U, end: U)(using Ox, StageCapacity): Source[U] =
intersperse(Some(start), inject, Some(end))

private def intersperse[U >: T](start: Option[U], inject: U, end: Option[U])(using Ox, StageCapacity): Source[U] =
val c = StageCapacity.newChannel[U]
forkDaemon {
start.foreach(c.send)
var firstEmitted = false
repeatWhile {
receive() match
case ChannelClosed.Done => end.foreach(c.send); c.done(); false
case ChannelClosed.Error(e) => c.error(e); false
case v: U @unchecked if !firstEmitted => firstEmitted = true; c.send(v); true
case v: U @unchecked => c.send(inject); c.send(v); true
}
}
c

/** Applies the given mapping function `f` to each element received from this source, and sends the results to the returned channel. At
* most `parallelism` invocations of `f` are run in parallel.
*
Expand Down
39 changes: 39 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsIntersperseTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsIntersperseTest extends AnyFlatSpec with Matchers {
behavior of "Source.intersperse"

it should "intersperse with inject only over an empty source" in supervised {
val s = Source.empty[String]
s.intersperse(", ").toList shouldBe List.empty
}

it should "intersperse with inject only over a source with one element" in supervised {
val s = Source.fromValues("foo")
s.intersperse(", ").toList shouldBe List("foo")
}

it should "intersperse with inject only over a source with multiple elements" in supervised {
val s = Source.fromValues("foo", "bar")
s.intersperse(", ").toList shouldBe List("foo", ", ", "bar")
}

it should "intersperse with start, inject and end over an empty source" in supervised {
val s = Source.empty[String]
s.intersperse("[", ", ", "]").toList shouldBe List("[", "]")
}

it should "intersperse with start, inject and end over a source with one element" in supervised {
val s = Source.fromValues("foo")
s.intersperse("[", ", ", "]").toList shouldBe List("[", "foo", "]")
}

it should "intersperse with start, inject and end over a source with multiple elements" in supervised {
val s = Source.fromValues("foo", "bar")
s.intersperse("[", ", ", "]").toList shouldBe List("[", "foo", ", ", "bar", "]")
}
}

0 comments on commit a4de618

Please sign in to comment.