diff --git a/util/src/main/scala/io/catbird/util/package.scala b/util/src/main/scala/io/catbird/util/package.scala index aceb06bc..6639c1a4 100644 --- a/util/src/main/scala/io/catbird/util/package.scala +++ b/util/src/main/scala/io/catbird/util/package.scala @@ -1,3 +1,8 @@ package io.catbird -package object util extends FutureInstances with TryInstances with VarInstances with AsyncStreamInstances +package object util + extends AsyncStreamInstances + with FutureInstances + with ReaderInstances + with TryInstances + with VarInstances diff --git a/util/src/main/scala/io/catbird/util/reader.scala b/util/src/main/scala/io/catbird/util/reader.scala new file mode 100644 index 00000000..f07b77d4 --- /dev/null +++ b/util/src/main/scala/io/catbird/util/reader.scala @@ -0,0 +1,70 @@ +package io.catbird +package util + +import cats.{ Alternative, Defer, Eq, Monad, Monoid, MonoidK, StackSafeMonad } +import cats.instances.list._ +import com.twitter.io.Reader +import com.twitter.util.{ Await, Duration } +import scala.collection.immutable.List + +/** Typeclass instances for [[com.twitter.io.Reader]] + * + * Note that while (to the best of my knowledge) these instances are lawful, Reader itself is inherently + * unsafe: it is based on mutable state and also requires manual cleanup to ensure resource safety. To use + * it in pure functional code, side-effecting operations should be wrapped in an effect type such as + * [[Rerunnable]], and cleanup should be ensured using something like [[cats.effect.Resource]]. + * + * TODO: add facilities to make that easier to do. + */ +trait ReaderInstances { + implicit final val readerInstance: Alternative[Reader] with Defer[Reader] with Monad[Reader] = + new Alternative[Reader] with ReaderDefer with ReaderMonad with ReaderMonoidK + + implicit final def readerMonoid[A](implicit A: Monoid[A]) = new ReaderMonoid[A] + + /** + * Obtain a [[cats.Eq]] instance for [[com.twitter.io.Reader]]. + * + * These instances use [[com.twitter.util.Await]] so should be + * [[https://finagle.github.io/blog/2016/09/01/block-party/ avoided in production code]]. Likely use cases + * include tests, scrips, REPLs etc. + */ + final def readerEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[Reader[A]] = new Eq[Reader[A]] { + final override def eqv(x: Reader[A], y: Reader[A]) = + Await.result( + Reader.readAllItems(x).joinWith(Reader.readAllItems(y))((xs, ys) => Eq[List[A]].eqv(xs.toList, ys.toList)), + atMost + ) + } +} + +/** Monad instance for twitter-util's Reader streaming abstraction + * + * Also entrant for "most confusing class name of the year" + */ +private[util] trait ReaderMonad extends StackSafeMonad[Reader] { + final override def map[A, B](fa: Reader[A])(f: A => B): Reader[B] = fa.map(f) + final override def pure[A](a: A): Reader[A] = Reader.value(a) + final override def flatMap[A, B](fa: Reader[A])(f: A => Reader[B]): Reader[B] = fa.flatMap(f) + final override def flatten[A](ffa: Reader[Reader[A]]): Reader[A] = ffa.flatten +} + +private[util] trait ReaderDefer extends Defer[Reader] { + /** Defer creation of a Reader + * + * There are a few ways to achieve this, such as using fromIterator on a lazy iterator, but this is the + * simplest I've come up with. Might be worth benchmarking alternatives. + */ + def defer[A](fa: => Reader[A]): Reader[A] = Reader.flatten(Reader.value(() => fa).map(_())) +} + +private[util] trait ReaderMonoidK extends MonoidK[Reader] { + final override def empty[A]: Reader[A] = Reader.empty + final override def combineK[A](x: Reader[A], y: Reader[A]): Reader[A] = Reader.concat(List(x, y)) +} + +private[util] final class ReaderMonoid[A](implicit A: Monoid[A]) extends Monoid[Reader[A]] { + final override def empty = Reader.value(A.empty) + final override def combine(xs: Reader[A], ys: Reader[A]) = + for(x <- xs; y <- ys) yield A.combine(x, y) +} diff --git a/util/src/test/scala/io/catbird/util/arbitrary.scala b/util/src/test/scala/io/catbird/util/arbitrary.scala index 3f00c401..3e517614 100644 --- a/util/src/test/scala/io/catbird/util/arbitrary.scala +++ b/util/src/test/scala/io/catbird/util/arbitrary.scala @@ -2,8 +2,9 @@ package io.catbird.util import com.twitter.concurrent.AsyncStream import com.twitter.conversions.DurationOps._ +import com.twitter.io.Reader import com.twitter.util.{ Future, Return, Try, Var } -import org.scalacheck.{ Arbitrary, Cogen } +import org.scalacheck.{ Arbitrary, Gen, Cogen } trait ArbitraryInstances { implicit def futureArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Future[A]] = @@ -18,6 +19,22 @@ trait ArbitraryInstances { implicit def asyncStreamArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[AsyncStream[A]] = Arbitrary(A.arbitrary.map(AsyncStream.of)) + // Note that this doesn't cover BufReader or InputStreamReader currently + private def readerGen[A: Arbitrary](depth: Int = 0): Gen[Reader[A]] = { + val options = List( + 1 -> Gen.const(Reader.empty[A]), // empty reader defined inline + 3 -> Arbitrary.arbitrary[A].flatMap(Reader.value(_)), // FutureReader (used for both fromFuture and value) + 1 -> Gen.const(Reader.exception(new Exception)), // also FutureReader but making sure we exercise the failed case + 3 -> Gen.listOf(Arbitrary.arbitrary[A]).flatMap(Reader.fromSeq(_)), // IteratorReader + 2 -> Arbitrary.arbitrary[AsyncStream[A]].flatMap(Reader.fromAsyncStream(_)) // uses Pipe + ) + lazy val flatten = 2 -> readerGen[A](depth + 1).map(Reader.value(_).flatten) // flatten any of the other types + Gen.frequency((if (depth < 4) flatten :: options else options): _*) + } + + implicit def readerArbitrary[A: Arbitrary]: Arbitrary[Reader[A]] = + Arbitrary(readerGen[A]()) + implicit def rerunnableArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Rerunnable[A]] = Arbitrary(futureArbitrary[A].arbitrary.map(Rerunnable.fromFuture[A](_))) diff --git a/util/src/test/scala/io/catbird/util/reader.scala b/util/src/test/scala/io/catbird/util/reader.scala new file mode 100644 index 00000000..94b35543 --- /dev/null +++ b/util/src/test/scala/io/catbird/util/reader.scala @@ -0,0 +1,21 @@ +package io.catbird +package util + +import cats.Eq +import cats.instances.boolean._ +import cats.instances.int._ +import cats.instances.tuple._ +import cats.kernel.laws.discipline.MonoidTests +import cats.laws.discipline.{ AlternativeTests, DeferTests, MonadTests } +import com.twitter.io.Reader +import com.twitter.conversions.DurationOps._ + +class ReaderSuite extends CatbirdSuite with ReaderInstances with ArbitraryInstances with EqInstances { + implicit private def eqReader[A: Eq]: Eq[Reader[A]] = readerEq[A](1.second) + + checkAll("Reader[Int]", AlternativeTests[Reader].alternative[Int, Int, Int]) + checkAll("Reader[Int]", DeferTests[Reader].defer[Int]) + checkAll("Reader[Int]", MonadTests[Reader].monad[Int, Int, Int]) + + checkAll("Reader[Int]", MonoidTests[Reader[Int]].monoid) +}