Skip to content

Commit

Permalink
feat: implement reduce operator
Browse files Browse the repository at this point in the history
Uses the first and the following (if available) elements from this source
and applies function `f` on them. The returned value is used as the next
current value and `f` is applied again with the value received from a source.
The operation is repeated until the source is drained.
This is similar operation to `fold` but it uses the first source element as
`zero` e.g.:

  Source.empty[Int].reduce(_ + _)    // throws NoSuchElementException("cannot reduce an empty source")
  Source.fromValues(1).reduce(_ + _) // 1
  val s = Source.fromValues(1, 2)
  s.reduce(_ + _)                    // 3
  s.receive()                        // ChannelClosed.Done

Note that in case when function `f` thrown an exception the it is
propagated up to the caller.
  • Loading branch information
geminicaprograms committed Oct 30, 2023
1 parent 775f8db commit 0122b31
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 0 deletions.
32 changes: 32 additions & 0 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,38 @@ trait SourceOps[+T] { this: Source[T] =>
case t: T @unchecked => current = f(current, t); true
}
current

/** Uses the first and the following (if available) elements from this source and applies function `f` on them. The returned value is used
* as the next current value and `f` is applied again with the value received from this source. The operation is repeated until this
* source is drained. This is similar operation to [[fold]] but it uses the first source element as `zero`.
*
* @param f
* A binary function (a function that takes two arguments) that is applied to the current and next values received from this source.
* @return
* Combined value retrieved from running function `f` on all source elements in a cumulative manner where result of the previous call
* is used as an input value to the next.
* @throws NoSuchElementException
* When this source is empty.
* @throws ChannelClosedException.Error
* When receiving an element from this source fails.
* @throws exception
* When function `f` throws an `exception` then it is propagated up to the caller.
* @example
* {{{
* import ox.*
* import ox.channels.Source
*
* supervised {
* Source.empty[Int].reduce(_ + _) // throws NoSuchElementException("cannot reduce an empty source")
* Source.fromValues(1).reduce(_ + _) // 1
* val s = Source.fromValues(1, 2)
* s.reduce(_ + _) // 3
* s.receive() // ChannelClosed.Done
* }
* }}}
*/
def reduce[U >: T](f: (U, U) => U): U =
fold(headOption().getOrElse(throw new NoSuchElementException("cannot reduce an empty source")))(f)
}

trait SourceCompanionOps:
Expand Down
53 changes: 53 additions & 0 deletions core/src/test/scala/ox/channels/SourceOpsReduceTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package ox.channels

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*

class SourceOpsReduceTest extends AnyFlatSpec with Matchers {
behavior of "SourceOps.reduce"

it should "throw NoSuchElementException for reduce over the empty source" in supervised {
the[NoSuchElementException] thrownBy {
Source.empty[Int].reduce(_ + _)
} should have message "cannot reduce an empty source"
}

it should "throw ChannelClosedException.Error with exception and message that was thrown during retrieval" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failed[Int](new RuntimeException("source is broken"))
.reduce(_ + _)
} should have message "java.lang.RuntimeException: source is broken"
}

it should "throw ChannelClosedException.Error for source failed without exception" in supervised {
the[ChannelClosedException.Error] thrownBy {
Source
.failedWithoutReason[Int]()
.reduce(_ + _)
}
}

it should "throw exception thrown in `f` when `f` throws" in supervised {
the[RuntimeException] thrownBy {
Source
.fromValues(1, 2)
.reduce((_, _) => throw new RuntimeException("Function `f` is broken"))
} should have message "Function `f` is broken"
}

it should "return first element from reduce over the single element source" in supervised {
Source.fromValues(1).reduce(_ + _) shouldBe 1
}

it should "run reduce over on non-empty source" in supervised {
Source.fromValues(1, 2).reduce(_ + _) shouldBe 3
}

it should "drain the source" in supervised {
val s = Source.fromValues(1)
s.reduce(_ + _) shouldBe 1
s.receive() shouldBe ChannelClosed.Done
}
}

0 comments on commit 0122b31

Please sign in to comment.