Skip to content
benqua edited this page Apr 11, 2012 · 6 revisions

Handling data streams reactively

Enumerators

If an iteratee represents the consumer, or sink, of input, an Enumerator is the source that pushes input into a given iteratee. As the name suggests, it enumerates some input into the iteratee and eventually returns the new state of that iteratee. This can be easily seen looking at the Enumerator’s signature:

trait Enumerator[E] {

  /**
   * Apply this Enumerator to an Iteratee
   */
  def apply[A](i: Iteratee[E, A]): Promise[Iteratee[E, A]]

}

An Enumerator[E] takes an Iteratee[E,A] which is any iteratee that consumes Input[E] and returns a Promise[Iteratee[E,A]] which eventually gives the new state of the iteratee.

We can go ahead and manually implement Enumerator instances by consequently calling the iteratee’s fold method, or use one of the provided Enumerator creation methods. For instance we can create an Enumerator[String] that pushes a list of strings into an iteratee, like the following:

val enumerateUsers: Enumerator[String] = {
  Enumerator("Guillaume", "Sadek", "Peter", "Erwan")
}

Now we can apply it to the consume iteratee we created before:

val consume = Iteratee.consume[String]()
val newIteratee: Promise[Iteratee[String,String]] = enumerateUsers(consume) 

To terminate the iteratee and extract the computed result we pass Input.EOF. An Iteratee carries a run method that does just this. It pushes an Input.EOF and returns a Promise[A], ignoring left input if any.

// We use flatMap since newIteratee is a promise, 
// and run itself return a promise
val eventuallyResult: Promise[String] = newIteratee.flatMap(i => i.run)

//Eventually print the result
eventuallyResult.onRedeem(s => println(s))

// Prints "GuillaumeSadekPeterErwan"

You might notice here that an Iteratee will eventually produce a result (returning a promise when calling fold and passing appropriate calbacks), and a Promise eventually produces a result. Then a Promise[Iteratee[E,A]] can be viewed as Iteratee[E,A]. Indeed this is what Iteratee.flatten does, Let’s apply it to the previous example:

//Apply the enumerator and flatten then run the resulting iteratee
val newIteratee = Iteratee.flatten(enumerateUsers(consume))

val eventuallyResult: Promise[String] = newIteratee.run
   
//Eventually print the result 
eventuallyResult.onRedeem(s => println(s)) 

// Prints "GuillaumeSadekPeterErwan"

An Enumerator has some symbolic methods that can act as operators, which can be useful in some contexts for saving some parentheses. For example, the |>> method works exactly like apply:

val eventuallyResult: Promise[String] = {
  Iteratee.flatten(enumerateUsers |>> consume).run
}

Since an Enumerator pushes some input into an iteratee and eventually return a new state of the iteratee, we can go on pushing more input into the returned iteratee using another Enumerator. This can be done either by using the flatMap function on Promises or more simply by combining Enumerator instancess using the andThen method, as follows:

val colors = Enumerator("Red","Blue","Green")

val moreColors = Enumerator("Grey","Orange","Yellow")

val combinedEnumerator = colors.andThen(moreColors)

val eventuallyIteratee = combinedEnumerator(consume)

As for apply, there is a symbolic version of the andThen called >>> that can be used to save some parentheses when appropriate:

val eventuallyIteratee = {
  Enumerator("Red","Blue","Green") >>>
  Enumerator("Grey","Orange","Yellow") |>>
  consume    
}

We can also create Enumerators for enumerating files contents:

val fileEnumerator: Enumerator[Array[Byte]] = {
  Enumerator.fromFile(new File("path/to/some/file"))
}

Or more generally enumerating a java.io.InputStream using Enumerator.fromStream. It is important to note that input won't be read until the iteratee this Enumerator is applied on is ready to take more input.

Actually both methods are based on the more generic Enumerator.fromCallback that has the following signature:

def fromCallback[E](
  retriever: () => Promise[Option[E]],
  onComplete: () => Unit = () => (),
  onError: (String, Input[E]) => Unit = (_: String, _: Input[E]) => ()
): Enumerator[E] = {
  ... 
}

This method defined on the Enumerator object is one of the most important methods for creating Enumerators from imperative logic. Looking closely at the signature, this method takes a callback function retriever: () => Promise[Option[E]] that will be called each time the iteratee this Enumerator is applied to is ready to take some input.

It can be easily used to create an Enumerator that represents a stream of time values every 100 millisecond using the opportunity that we can return a promise, like the following:

Enumerator.fromCallback { () =>
  Promise.timeout(Some(new Date), 100 milliseconds)
}

In the same manner we can construct an Enumerator that would fetch a url every some time using the WS api which returns, not suprisingly a Promise

Combining this, callback Enumerator, with an imperative Iteratee.foreach we can println a stream of time values periodically:

val timeStream = Enumerator.fromCallback { () => 
  Promise.timeout(Some(new Date), 100 milliseconds)
}

val printlnSink = Iteratee.foreach[Date](date => println(date))

timeStream |>> printlnSink

Another, more imperative, way of creating an Enumerator is by using Enumerator.pushee which once it is ready will give a Pushee interface on which defined methods push and close:

val channel = Enumerator.pushee[String] { onStart = pushee =>
  pushee.push("Hello")
  pushee.push("World")
}

channel |>> Iteratee.foreach(println)

The onStart function will be called each time the Enumerator is applied to an Iteratee. In some applications, a chatroom for instance, it makes sense to assign the pushee to a synchronized global value (using STMs for example) that will contain a list of listeners. Enumerator.pushee accepts two other functions, onComplete and onError.

One more interesting method is the interleave or >- method which as the name says, itrerleaves two Enumerators. For reactive Enumerators Input will be passed as it happens from any of the interleaved Enumerators

Enumerators à la carte

Now that we have several interesting ways of creating Enumerators, we can use these together with composition methods andThen / >>> and interleave / >- to compose Enumerators on demand.

Indeed one interesting way of organizing a streamful application is by creating primitive Enumerators and then composing a collection of them. Let’s imagine doing an application for monitoring systems:

object AvailableStreams {

  val cpu: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

  val memory: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

  val threads: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

  val heap: Enumerator[JsValue] = Enumerator.fromCallback(/* code here */)

}

val physicalMachine = AvailableStreams.cpu >- AvailableStreams.memory
val jvm = AvailableStreams.threads >- AvailableStreams.heap

def usersWidgetsComposition(prefs: Preferences) = {
  // do the composition dynamically
}

Now, it is time to adapt and transform Enumerators and Iteratees using ... Enumeratees!

Next: Enumeratees

Clone this wiki locally