diff --git a/core/src/main/scala/ox/channels/SourceOps.scala b/core/src/main/scala/ox/channels/SourceOps.scala index fd205d2b..99deffd0 100644 --- a/core/src/main/scala/ox/channels/SourceOps.scala +++ b/core/src/main/scala/ox/channels/SourceOps.scala @@ -793,6 +793,7 @@ trait SourceOps[+T] { outer: Source[T] => * }}} */ def groupedWeighted(minWeight: Long)(costFn: T => Long)(using Ox, StageCapacity): Source[Seq[T]] = + require(minWeight > 0, "minWeight must be > 0") val c2 = StageCapacity.newChannel[Seq[T]] fork { var buffer = Vector.empty[T] @@ -899,6 +900,8 @@ trait SourceOps[+T] { outer: Source[T] => * }}} */ def groupedWeightedWithin(minWeight: Long, duration: FiniteDuration)(costFn: T => Long)(using Ox, StageCapacity): Source[Seq[T]] = + require(minWeight > 0, "minWeight must be > 0") + require(duration > 0.seconds, "duration must be > 0") val c2 = StageCapacity.newChannel[Seq[T]] val timerChannel = StageCapacity.newChannel[GroupingTimeout.type] fork { @@ -976,8 +979,8 @@ trait SourceOps[+T] { outer: Source[T] => * }}} */ def sliding(n: Int, step: Int = 1)(using Ox, StageCapacity): Source[Seq[T]] = - if n <= 0 then throw IllegalArgumentException("n must be > 0") - if step <= 0 then throw IllegalArgumentException("step must be > 0") + require(n > 0, "n must be > 0") + require(step > 0, "step must be > 0") val c = StageCapacity.newChannel[Seq[T]] fork { var buffer = Vector.empty[T]