Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement takeWhile function #22

Merged
merged 2 commits into from
Oct 17, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 20 additions & 1 deletion core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package ox.channels

import ox.*

import java.util.concurrent.{ArrayBlockingQueue, ConcurrentLinkedQueue, CountDownLatch, LinkedBlockingQueue, Semaphore}
import java.util.concurrent.{CountDownLatch, Semaphore}
import scala.collection.mutable
import scala.concurrent.duration.FiniteDuration

Expand Down Expand Up @@ -141,6 +141,25 @@ trait SourceOps[+T] { this: Source[T] =>

def take(n: Int)(using Ox, StageCapacity): Source[T] = transform(_.take(n))

/** Sends elements to the returned channel until predicate `f` is satisfied (returns `true`). Note that when the predicate `f` is not
* satisfied (returns `false`), subsequent elements are dropped even if they could still satisfy it.
*
* @param f
* A predicate function.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* scoped {
* Source.empty[Int].takeWhile(_ > 3).toList // List()
* Source.fromValues(1, 2, 3).takeWhile(_ < 3).toList // List(1, 2)
* Source.fromValues(3, 2, 1).takeWhile(_ < 3).toList // List()
* }
* }}}
*/
def takeWhile(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.takeWhile(f))

def filter(f: T => Boolean)(using Ox, StageCapacity): Source[T] = transform(_.filter(f))

def transform[U](f: Iterator[T] => Iterator[U])(using Ox, StageCapacity): Source[U] =
Expand Down
24 changes: 24 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsTakeWhileTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsTakeWhileTest extends AnyFlatSpec with Matchers {
behavior of "Source.takeWhile"

it should "not take from the empty source" in supervised {
val s = Source.empty[Int]
s.takeWhile(_ < 3).toList shouldBe List.empty
}

it should "take as long as predicate is satisfied" in supervised {
val s = Source.fromValues(1, 2, 3)
s.takeWhile(_ < 3).toList shouldBe List(1, 2)
}

it should "not take if predicate fails for first or more elements" in supervised {
val s = Source.fromValues(3, 2, 1)
s.takeWhile(_ < 3).toList shouldBe List()
}
}
Loading