Skip to content

Commit

Permalink
#793 Further improvements, add doc links
Browse files Browse the repository at this point in the history
  • Loading branch information
timo-schmid committed May 29, 2019
1 parent 10d4a4d commit 1e71269
Show file tree
Hide file tree
Showing 2 changed files with 99 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,92 +29,176 @@ final class Sink[F[+ _], +A0, -A, +B] private[stream] (private[stream] val under
import ZSink.Step
import Stream.{ liftF, liftZIO }

/**
* See [[scalaz.zio.stream.ZSink#initial]]
*/
def initial(implicit R: Runtime[Any], E: Effect[F]): F[Step[State, Nothing]] =
liftF[F, Any, Step[State, Nothing]](underlying.initial)

/**
* See [[scalaz.zio.stream.ZSink#step]]
*/
def step(state: State, a: A)(implicit R: Runtime[Any], E: Effect[F]): F[Step[State, A0]] =
liftF(underlying.step(state, a))

/**
* See [[scalaz.zio.stream.ZSink#extract]]
*/
def extract(state: State)(implicit R: Runtime[Any], E: Effect[F]): F[B] =
liftF(underlying.extract(state))

/**
* See [[scalaz.zio.stream.ZSink#stepChunk]]
*/
def stepChunk[A1 <: A](state: State, as: Chunk[A1])(implicit R: Runtime[Any], E: Effect[F]): F[Step[State, A0]] =
liftF(underlying.stepChunk(state, as))

/**
* See [[scalaz.zio.stream.ZSink#update]]
*/
def update(state: Step[State, Nothing]): Sink[F, A0, A, B] =
new Sink(underlying.update(state))

/**
* See [[scalaz.zio.stream.ZSink#chunked]]
*/
def chunked[A1 >: A0, A2 <: A]: Sink[F, A1, Chunk[A2], B] =
new Sink(underlying.chunked)

/**
* See [[scalaz.zio.stream.ZSink#mapM]]
*/
def mapM[C](f: B => F[C])(implicit E: Effect[F]): Sink[F, A0, A, C] =
new Sink(underlying.mapM(f.andThen(liftZIO(_))))

/**
* See [[scalaz.zio.stream.ZSink#map]]
*/
def map[C](f: B => C): Sink[F, A0, A, C] =
new Sink(underlying.map(f))

/**
* See [[scalaz.zio.stream.ZSink#filter]]
*/
def filter[A1 <: A](f: A1 => Boolean): Sink[F, A0, A1, B] =
new Sink(underlying.filter(f))

/**
* See [[scalaz.zio.stream.ZSink#filterM]]
*/
def filterM[A1 <: A](f: A1 => F[Boolean])(implicit E: Effect[F]): Sink[F, A0, A1, B] =
new Sink(underlying.filterM(f.andThen(liftZIO(_))))

/**
* See [[scalaz.zio.stream.ZSink#filterNot]]
*/
def filterNot[A1 <: A](f: A1 => Boolean): Sink[F, A0, A1, B] =
new Sink(underlying.filterNot(f))

/**
* See [[scalaz.zio.stream.ZSink#filterNotM]]
*/
def filterNotM[A1 <: A](f: A1 => F[Boolean])(implicit E: Effect[F]): Sink[F, A0, A1, B] =
new Sink(underlying.filterNotM(f.andThen(liftZIO(_))))

/**
* See [[scalaz.zio.stream.ZSink#contramap]]
*/
def contramap[C](f: C => A): Sink[F, A0, C, B] =
new Sink(underlying.contramap(f))

/**
* See [[scalaz.zio.stream.ZSink#contramapM]]
*/
def contramapM[C](f: C => F[A])(implicit E: Effect[F]): Sink[F, A0, C, B] =
new Sink(underlying.contramapM(f.andThen(liftZIO(_))))

/**
* See [[scalaz.zio.stream.ZSink#dimap]]
*/
def dimap[C, D](f: C => A)(g: B => D): Sink[F, A0, C, D] =
new Sink(underlying.dimap(f)(g))

/**
* See [[scalaz.zio.stream.ZSink#mapError]]
*/
def mapError[E1 <: Throwable](f: Throwable => E1): Sink[F, A0, A, B] =
new Sink(underlying.mapError(f))

/**
* See [[scalaz.zio.stream.ZSink#mapRemainder]]
*/
def mapRemainder[A1](f: A0 => A1): Sink[F, A1, A, B] =
new Sink(underlying.mapRemainder(f))

/**
* See [[scalaz.zio.stream.ZSink#provideSome]]
*/
// TODO Not sure this is useful for cats interop, probably should be deleted
def provideSome(f: Any => Any): Sink[F, A0, A, B] =
new Sink(underlying.provideSome(f))

/**
* See [[scalaz.zio.stream.ZSink#const]]
*/
def const[C](c: => C): Sink[F, A0, A, C] =
new Sink(underlying.const(c))

/**
* See [[scalaz.zio.stream.ZSink#unit]]
*/
def unit: Sink[F, A0, A, Unit] =
new Sink(underlying.unit)

/**
* See [[scalaz.zio.stream.ZSink#untilOutput]]
*/
def untilOutput(f: B => Boolean): Sink[F, A0, A, B] =
new Sink(underlying.untilOutput(f))

/**
* See [[scalaz.zio.stream.ZSink#?]]
*/
def ? : Sink[F, A0, A, Option[B]] =
new Sink(underlying.?)

/**
* See [[scalaz.zio.stream.ZSink#optional]]
*/
def optional: Sink[F, A0, A, Option[B]] = ?

/**
* See [[scalaz.zio.stream.ZSink#race]]
*/
def race[A2 >: A0, A1 <: A, B1 >: B](
that: Sink[F, A2, A1, B1]
): Sink[F, A2, A1, B1] =
new Sink(underlying.race(that.underlying))

/**
* See [[scalaz.zio.stream.ZSink#|]]
*/
def |[A2 >: A0, A1 <: A, B1 >: B](
that: Sink[F, A2, A1, B1]
): Sink[F, A2, A1, B1] = race(that)

/**
* See [[scalaz.zio.stream.ZSink#raceBoth]]
*/
def raceBoth[A2 >: A0, A1 <: A, C](
that: Sink[F, A2, A1, C]
): Sink[F, A2, A1, Either[B, C]] =
new Sink(underlying.raceBoth(that.underlying))

/**
* See [[scalaz.zio.stream.ZSink#takeWhile]]
*/
def takeWhile[A1 <: A](pred: A1 => Boolean): Sink[F, A0, A1, B] =
new Sink(underlying.takeWhile(pred))

/**
* See [[scalaz.zio.stream.ZSink#dropWhile]]
*/
def dropWhile[A1 <: A](pred: A1 => Boolean): Sink[F, A0, A1, B] =
new Sink(underlying.dropWhile(pred))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package scalaz.zio.interop.stream
import cats.effect.{ Effect, Sync }
import cats.implicits._
import scalaz.zio.clock.Clock
import scalaz.zio.stream.Stream.ConformsR
import scalaz.zio._
import scalaz.zio.stream.{ Take, ZStream }

Expand Down Expand Up @@ -175,9 +174,8 @@ final class Stream[F[+ _], +A] private[stream] (private[stream] val underlying:
* - Managed
* See [[scalaz.zio.stream.ZStream#peel]]
*/
def peel[A1 >: A, B](sink: Sink[F, A1, A1, B]): ZManaged[Any, Throwable, (B, Stream[F, A1])] =
???
// underlying.peel(sink.underlying)
def peel[A1 >: A, B](sink: Sink[F, A1, A1, B]): ZManaged[Any, Throwable, (B, ZStream[Any, Throwable, A1])] =
underlying.peel(sink.underlying)

/**
* TODO
Expand Down Expand Up @@ -233,7 +231,8 @@ final class Stream[F[+ _], +A] private[stream] (private[stream] val underlying:
* - ZManaged
* See [[scalaz.zio.stream.ZStream#toQueue]]
*/
def toQueue[A1 >: A](capacity: Int = 1): ZManaged[Any, Nothing, Queue[Take[Throwable, A1]]] = ???
def toQueue[A1 >: A](capacity: Int = 1): ZManaged[Any, Nothing, Queue[Take[Throwable, A1]]] =
underlying.toQueue(capacity)

/**
* See [[scalaz.zio.stream.ZStream#transduce]]
Expand Down Expand Up @@ -306,16 +305,19 @@ object Stream {
)
)

final def managed[F[+ _], R: ConformsR, E, A, B](
m: ZManaged[R, E, A]
)(read: A => ZIO[R, E, Option[B]]): Stream[F, B] =
???
final def managed[F[+ _], R, E, A, B](
m: ZManaged[Any, Throwable, A]
)(read: A => ZIO[Any, Throwable, Option[B]]): Stream[F, B] =
new Stream(ZStream.managed[Any, Throwable, A, B](m)(read))

final def fromQueue[F[+ _], A](queue: Queue[A]): Stream[F, A] =
new Stream(ZStream.fromQueue(queue))
// TODO Adding both of those leads to "have same type after erasure"
// Should we add one of them, or both? Is it possible?

final def fromQueue[RB: ConformsR, EB, B](queue: ZQueue[_, _, RB, EB, _, B]): ZStream[RB, EB, B] =
???
//final def fromQueue[F[+ _], A](queue: Queue[A]): Stream[F, A] =
// new Stream(ZStream.fromQueue(queue))

//final def fromQueue[F[+ _], B](queue: ZQueue[_, _, Any, Throwable, _, B]): Stream[F, B] =
// new Stream(ZStream.fromQueue(queue))

final def unfold[F[+ _], S, A](s: S)(f0: S => Option[(A, S)]): Stream[F, A] =
new Stream[F, A](ZStream.unfold(s)(f0))
Expand Down

0 comments on commit 1e71269

Please sign in to comment.