Skip to content

Commit

Permalink
Add parameter validation to grouped* operators (#165)
Browse files Browse the repository at this point in the history
  • Loading branch information
micossow authored Jun 27, 2024
1 parent 6e5cea1 commit a6c115a
Showing 1 changed file with 5 additions and 2 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/ox/channels/SourceOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -793,6 +793,7 @@ trait SourceOps[+T] { outer: Source[T] =>
* }}}
*/
def groupedWeighted(minWeight: Long)(costFn: T => Long)(using Ox, StageCapacity): Source[Seq[T]] =
require(minWeight > 0, "minWeight must be > 0")
val c2 = StageCapacity.newChannel[Seq[T]]
fork {
var buffer = Vector.empty[T]
Expand Down Expand Up @@ -899,6 +900,8 @@ trait SourceOps[+T] { outer: Source[T] =>
* }}}
*/
def groupedWeightedWithin(minWeight: Long, duration: FiniteDuration)(costFn: T => Long)(using Ox, StageCapacity): Source[Seq[T]] =
require(minWeight > 0, "minWeight must be > 0")
require(duration > 0.seconds, "duration must be > 0")
val c2 = StageCapacity.newChannel[Seq[T]]
val timerChannel = StageCapacity.newChannel[GroupingTimeout.type]
fork {
Expand Down Expand Up @@ -976,8 +979,8 @@ trait SourceOps[+T] { outer: Source[T] =>
* }}}
*/
def sliding(n: Int, step: Int = 1)(using Ox, StageCapacity): Source[Seq[T]] =
if n <= 0 then throw IllegalArgumentException("n must be > 0")
if step <= 0 then throw IllegalArgumentException("step must be > 0")
require(n > 0, "n must be > 0")
require(step > 0, "step must be > 0")
val c = StageCapacity.newChannel[Seq[T]]
fork {
var buffer = Vector.empty[T]
Expand Down

0 comments on commit a6c115a

Please sign in to comment.