Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion util/src/main/scala/io/catbird/util/package.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
package io.catbird

package object util extends FutureInstances with TryInstances with VarInstances with AsyncStreamInstances
package object util
extends AsyncStreamInstances
with FutureInstances
with ReaderInstances
with TryInstances
with VarInstances
70 changes: 70 additions & 0 deletions util/src/main/scala/io/catbird/util/reader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package io.catbird
package util

import cats.{ Alternative, Defer, Eq, Monad, Monoid, MonoidK, StackSafeMonad }
import cats.instances.list._
import com.twitter.io.Reader
import com.twitter.util.{ Await, Duration }
import scala.collection.immutable.List

/** Typeclass instances for [[com.twitter.io.Reader]]
*
* Note that while (to the best of my knowledge) these instances are lawful, Reader itself is inherently
* unsafe: it is based on mutable state and also requires manual cleanup to ensure resource safety. To use
* it in pure functional code, side-effecting operations should be wrapped in an effect type such as
* [[Rerunnable]], and cleanup should be ensured using something like [[cats.effect.Resource]].
*
* TODO: add facilities to make that easier to do.
*/
trait ReaderInstances {
implicit final val readerInstance: Alternative[Reader] with Defer[Reader] with Monad[Reader] =
new Alternative[Reader] with ReaderDefer with ReaderMonad with ReaderMonoidK

implicit final def readerMonoid[A](implicit A: Monoid[A]) = new ReaderMonoid[A]

/**
* Obtain a [[cats.Eq]] instance for [[com.twitter.io.Reader]].
*
* These instances use [[com.twitter.util.Await]] so should be
* [[https://finagle.github.io/blog/2016/09/01/block-party/ avoided in production code]]. Likely use cases
* include tests, scrips, REPLs etc.
*/
final def readerEq[A](atMost: Duration)(implicit A: Eq[A]): Eq[Reader[A]] = new Eq[Reader[A]] {
final override def eqv(x: Reader[A], y: Reader[A]) =
Await.result(
Reader.readAllItems(x).joinWith(Reader.readAllItems(y))((xs, ys) => Eq[List[A]].eqv(xs.toList, ys.toList)),
atMost
)
}
}

/** Monad instance for twitter-util's Reader streaming abstraction
*
* Also entrant for "most confusing class name of the year"
*/
private[util] trait ReaderMonad extends StackSafeMonad[Reader] {
final override def map[A, B](fa: Reader[A])(f: A => B): Reader[B] = fa.map(f)
final override def pure[A](a: A): Reader[A] = Reader.value(a)
final override def flatMap[A, B](fa: Reader[A])(f: A => Reader[B]): Reader[B] = fa.flatMap(f)
final override def flatten[A](ffa: Reader[Reader[A]]): Reader[A] = ffa.flatten
}

private[util] trait ReaderDefer extends Defer[Reader] {
/** Defer creation of a Reader
*
* There are a few ways to achieve this, such as using fromIterator on a lazy iterator, but this is the
* simplest I've come up with. Might be worth benchmarking alternatives.
*/
def defer[A](fa: => Reader[A]): Reader[A] = Reader.flatten(Reader.value(() => fa).map(_()))
}

private[util] trait ReaderMonoidK extends MonoidK[Reader] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to confirm, this and the two above are separate traits just for the sake of code organisation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mainly - I have a vague idea about trying to implement some of the cats-effect typeclasses for Reader (not actually sure if it's possible), and if so I'd want to be able to mix Defer and Monad in without MonoidK.

final override def empty[A]: Reader[A] = Reader.empty
final override def combineK[A](x: Reader[A], y: Reader[A]): Reader[A] = Reader.concat(List(x, y))
}

private[util] final class ReaderMonoid[A](implicit A: Monoid[A]) extends Monoid[Reader[A]] {
final override def empty = Reader.value(A.empty)
final override def combine(xs: Reader[A], ys: Reader[A]) =
for(x <- xs; y <- ys) yield A.combine(x, y)
}
19 changes: 18 additions & 1 deletion util/src/test/scala/io/catbird/util/arbitrary.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package io.catbird.util

import com.twitter.concurrent.AsyncStream
import com.twitter.conversions.DurationOps._
import com.twitter.io.Reader
import com.twitter.util.{ Future, Return, Try, Var }
import org.scalacheck.{ Arbitrary, Cogen }
import org.scalacheck.{ Arbitrary, Gen, Cogen }

trait ArbitraryInstances {
implicit def futureArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Future[A]] =
Expand All @@ -18,6 +19,22 @@ trait ArbitraryInstances {
implicit def asyncStreamArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[AsyncStream[A]] =
Arbitrary(A.arbitrary.map(AsyncStream.of))

// Note that this doesn't cover BufReader or InputStreamReader currently
private def readerGen[A: Arbitrary](depth: Int = 0): Gen[Reader[A]] = {
val options = List(
1 -> Gen.const(Reader.empty[A]), // empty reader defined inline
3 -> Arbitrary.arbitrary[A].flatMap(Reader.value(_)), // FutureReader (used for both fromFuture and value)
1 -> Gen.const(Reader.exception(new Exception)), // also FutureReader but making sure we exercise the failed case
3 -> Gen.listOf(Arbitrary.arbitrary[A]).flatMap(Reader.fromSeq(_)), // IteratorReader
2 -> Arbitrary.arbitrary[AsyncStream[A]].flatMap(Reader.fromAsyncStream(_)) // uses Pipe
)
lazy val flatten = 2 -> readerGen[A](depth + 1).map(Reader.value(_).flatten) // flatten any of the other types
Gen.frequency((if (depth < 4) flatten :: options else options): _*)
}

implicit def readerArbitrary[A: Arbitrary]: Arbitrary[Reader[A]] =
Arbitrary(readerGen[A]())

implicit def rerunnableArbitrary[A](implicit A: Arbitrary[A]): Arbitrary[Rerunnable[A]] =
Arbitrary(futureArbitrary[A].arbitrary.map(Rerunnable.fromFuture[A](_)))

Expand Down
21 changes: 21 additions & 0 deletions util/src/test/scala/io/catbird/util/reader.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.catbird
package util

import cats.Eq
import cats.instances.boolean._
import cats.instances.int._
import cats.instances.tuple._
import cats.kernel.laws.discipline.MonoidTests
import cats.laws.discipline.{ AlternativeTests, DeferTests, MonadTests }
import com.twitter.io.Reader
import com.twitter.conversions.DurationOps._

class ReaderSuite extends CatbirdSuite with ReaderInstances with ArbitraryInstances with EqInstances {
implicit private def eqReader[A: Eq]: Eq[Reader[A]] = readerEq[A](1.second)

checkAll("Reader[Int]", AlternativeTests[Reader].alternative[Int, Int, Int])
checkAll("Reader[Int]", DeferTests[Reader].defer[Int])
checkAll("Reader[Int]", MonadTests[Reader].monad[Int, Int, Int])

checkAll("Reader[Int]", MonoidTests[Reader[Int]].monoid)
}