Skip to content

Commit

Permalink
feat: implement takeWhile function
Browse files Browse the repository at this point in the history
Takes elements from the source as long as the predicate is satisfied.
Note that if the predicate fails then subsequent elements are not longer
taken even if they could still satisfy it. Example:

  Source.empty[Int].takeWhile(_ > 3).toList          // List()
  Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
  Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
  • Loading branch information
geminicaprograms committed Oct 16, 2023
1 parent 161557a commit 379cd29
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
21 changes: 20 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ox.channels

import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -141,6 +141,25 @@ trait SourceOps[+T] { this: Source[T] =>

def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n))

/** Takes elements from the source as long as predicate `f` is satisfied (returns `true`). Note that once the predicate `f` is not
* satisfied (returns `false`) the subsequent elements are dropped even if they could still satisfy it.
*
* @param f
* A predicate function.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].takeWhile(_ > 3).toList // List()
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
* }
* }}}
*/
def takeWhile(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.takeWhile(f))

def filter(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.filter(f))

def transform[U](f: Iterator[T] => Iterator[U])(using Ox, StageCapacity): Source[U] =
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTakeWhileTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsTakeWhileTest extends AnyFlatSpec with Matchers {
behavior of "Source.takeWhile"

it should "not take from the empty source" in scoped {
val s = Source.empty[Int]
s.takeWhile(_ < 3).toList shouldBe List.empty
}

it should "take as long as predicate is satisfied" in scoped {
val s = Source.fromValues(1, 2, 3)
s.takeWhile(_ < 3).toList shouldBe List(1, 2)
}

it should "not take if predicate fails for first or more elements" in scoped {
val s = Source.fromValues(3, 2, 1)
s.takeWhile(_ < 3).toList shouldBe List()
}
}

0 comments on commit 379cd29

Please sign in to comment.