Warning
|
Be warned this text is a WIP. If you already have suggestions or maybe want to encourage me to write more, please write me an email to [email protected]. I would be glad to hear if someone finds this interesting; the problem with internet materials is that no one reads them. |
This is my attempt to introduce IO monad programming for those who never heard about it. Unlike material I myself found when I first met the concept, this does not try to “sell” it from the beginning as purely functional, referentially transparent solution, but rather as a smart way to organize computation, to get concurrent execution of subprograms virtually for free and without magic, and to avoid stack overflows.
The language for presenting ideas here is Scala 3, which is one of JVM languages, IMO the best (non-native) language today. I intentionally avoid using advanced Scala feautures and syntax sugar (like for comprehension or by-name parameters), so that even a reader without knowledge of Scala should understand.
Note
|
I suppose most people will read this at https://github.com/marekscholle/gradus-io, that’s why I don’t use advanced Asciidoctor features in this text. If you know how to get e.g. math displayed on Github, please let me know. |
It is supposed that a reader already knows basics of generic Java-like OOP typed programming (classes, instances, functions, generics) and the term interface for enforcing functionality which must be provided by type extending the interface. Example:
trait Animal:
def name: String // abstract - to be implemented by concrete class
def makeSound: String // abstract - to be implemented by concrete class
def introduceSelf(): Unit = // implemented with help of abstract methods
println(s"Hi! I'm $name and I do '$makeSound'!")
Note
|
Traits are
similar to Java interfaces.
Returning Unit is a Scala’s way of saying “return nothing”,
like void in C++ or Java. The Unit is the 0-tuple type,
and its only instance has a special notation: () .
In some sense it is an equivalent of Python’s None –
if you specify that a function returns Unit ,
you don’t need to return () explicitely.
|
Any (concrete) type implementing Animal
trait must implement
methods name
and makeSound
, and the method introduceSelf
is (by default) implemented in terms of these two methods.
One way to get an instance of Animal
is to define
an anonymous implementation:
val bob = new Animal:
def name: String = "Bob"
def makeSound: String = "Woof"
If we run bob.introduceSelf()
, we will get Hi! I’m Bob and I do 'Woof'!
in the console.
As this runs in JVM, we mostly don’t care about object lifecycles; as long as we can reach an object (through chain of references) it lives on the heap. A better viewpoint is that objects live forever and that the JVM runtime will take care that we will not run into memory issues.
In the text we will use one convenience of Scala language: sealed traits. (It is not critical for our purposes, but definitely it makes the code more readable.) Marking an interface as sealed tells the compiler there will be no other implementations than those provided in the same file:
/** Skeleton of *naive* implementation of generic `Option` type with two
* variants: empty (`None`) and present (`Some`).
* No one outside the file can extend `Option[A]`.
*/
sealed trait Option[A]:
def get: A
def map[B](f: A => B): Option[B]
object Option:
case class None[A]() extends Option[A]:
def get: A = throw NoSuchElementException
def map(f: A => B): None[B] = None()
case class Some[A](value: A) extends Option[A]:
def get: A = value
def map(f: A => B): Some[B] = Some(f(value))
The benefit is that if we have an instance opt
of type Option[A]
,
the compiler knows what are all possible variants and provides us a way to
do complete pattern matching on the instance:
opt match
case Some(a) => // handle the "present" case, `a` is the wrapped value
case None() => // handle the "empty" case
If you don’t know what it is, let’s start with a simple recursive function
which sums integers from 1 to n
:
def sum(n: Int): Long = // `n` is supposed to be >= 0
if (n == 0) 0L
else sum(n - 1) + n
If we run sum(1_000_000)
we get stack overflow –
when executing sum
for an n > 0
,
the compiler must first create a stack frame to compute
sum(n - 1)
and only then it can add n
and return.
The important thing here is that there is a work to do
after the recursive call.
On the contrary, the program
def helloWorldForever: Nothing =
println("Hello, world")
helloWorldForever
will (in Scala) not cause the stack overflow: the compiler is smart
enough to see that it can reuse the stack frame for
recursive call of helloWorldForever
.
Note
|
Nothing is a special type which (formally) inherits
from all other types and does not have any instances
(you obviously can’t have one instance of all types).
If we use it as return type, it is guaranteed the function
will never return normally – once called, the function will
run forever (until we terminate the program the hard way)
or fail with exception.
|
Scala has a special annotation for checking that a recursive function
boiles down to a tail-recursive one: @tailrec
.
If we apply this annotation to our sum
example,
we get a compilation error:
@tailrec
def sum(n: Int): Long =
if (n == 0) 0
else sum(n - 1) + n
^^^^^^^^^^
Cannot rewrite recursive call: it is not in tail position
Tail-recursion is good: it is cheaper than the usual one and we don’t risk stack overflows. It is a viable replacement for imperative loops:
def sum(n: Int): Long =
var acc = 0L
for { i <- 1 to n } acc += i
acc
is better written (without any mutation)
def sum(n: Int): Long =
@tailrec
def loop(n: Int, acc: Long): Long =
if (n == 0) acc
else loop(n - 1, n + acc)
loop(n, acc = 0L)
The trick to turn an imperative loop to @tailrec
recursion
by making local mutable variable parameter of a helper @tailrec
function
is not uncommon, but requires some practise.
We start with a question how we can represent a program in our code. This may raise questions
-
Why we would do that?
-
What’s wrong with writing programs starting from the main function and calling other functions?
-
What’s the difference between a function (like the main one) and your “program”?
Of course, there is nothing wrong with code starting from the main function and calling other functions. But as we will see, formulating what is a “program” will provide us some non-trivial benefits; it will provide us a way to implement some concepts (like yield or resources) that usually must be provided by language itself (and often are not).
Our first attempt to represent an executable piece of code may look like this:
trait Program1:
def execute(): Unit
Program1
is a minimal version of program;
it does not take any arguments and returns nothing.
It is the same as
Java’s Runnable
,
and people coming from C background would maybe call this
a callback type as this is a natural candidate
for callbacks fired when something happens.
As it does return nothing, it is not much useful. The only thing we can do is a primitive chain operation:
trait Program1:
def execute(): Unit
/** Program which first executes `this` program and then the `that` program. */
def andThen(that: Program1): Program1 =
val self = this // capture `this` into the returned program as `self`
new Program1: // return anonymous implementation of `Program1`
def execute(): Unit =
self.execute() // execute the "outer" program
that.execute() // execute the program passed as argument
So, given two programs p
and q
, we can create
a program which sequentially executes both: p.andThen(q)
.
We can pass an instance of
Program1
as argument to some function which may (and may not)
execute the piece of code suspended in execute
method.
(This is what you do in Java when you pass a Runnable
instance
to Thread
constructor.)
It is worth noting that the requirement that Program1
does not
accept any arguments is in fact not limiting: we can capture (“bake”)
whatever we want into an instance of Program1
like this:
// local variables captured by program
val a = 1
val b = 2
// anonymous instance capturing local variables
new Program1:
def execute(): Unit =
// the references to `a` and `b` are captured into this anonymous `Program1`'s
// state, so we can refer to them as if with `this.a` and `this.b`
println(s"$a + $b = ${a + b}")
It is generally useful to ask what are minimal implementations
of interfaces we meet:
for a set, it would be the empty set, for Program1
,
it is a program which captures nothing, does nothing and returns nothing:
val dummy =
new Program1:
def execute(): Unit = ()
The (obvious) problem is that we execute
a Program1
just
for its side effect(s): sure, we can print its result to console,
to file, save to database or to some agreed-before piece of memory,
but it is clumsy.
A natural thing to do is to make the program generic in its return type:
trait Program2[A]:
def execute(): A
This is much better.
As before, we don’t need Program2
to accept arguments for execute
method
as we can capture whatever we need directly into instances of Program2
.
The difference is that the execution of program now has means
to return its result without modifying the external world.
Please note that Program2
is a direct generalization of Program1
which it logically the same as Program2[Unit]
.
Let’s examine what we can do with Program2
.
As we now have a meaningful result of execution,
we can create a new program by remapping the result of another one:
trait Program2[A]:
def execute(): A
/** Program which executes `this` program, passes the result
* as argument to `f` and returns its result.
*/
def map[B](f: A => B): Program2[B] =
val self = this // capture `this` into returned program as `self`
new Program2[B]:
def execute(): B =
val a = self.execute() // execute the "outer" program
f(a) // apply transformation to the result of the "outer" program
Hence, given a function f: A => B
, we can transform
a Program2[A]
to Program2[B]
by applying f
on the result of the former one.
(Note that Program2[A].map[B](f)
only plannes f
's execution
when the resulting program is executed;
the creating of programs is without side effects.)
In other words (more elaborately), map
is an operation
(generic in A
, B
) which takes
a Program2[A]
(as implicit this
),
a function A => B
(as argument)
and returns Program2[B]
.
So far we have not done anything surprising or unclear:
we just have crafted an interface (Program2
) to wrap
a function (execute
) with baken-into arguments,
with convenient syntax to apply a transformation on the result (map
).
Now we will introduce first non-obvious thing:
if we substitute B === Program[C]
in map[B](f: A => B)
,
we meet a type for a program producing another program
since the return type of map[C](f: A => Program[C])
is Program[Program[C]]
.
Not suprisingly, Program[Program[C]]
belongs to “family”
of programs producing a C
.
Note
|
If you know Future[A] (or Promise<A> in Javascript),
an analogy is at hand: Future[A] is a “container for future value”,
or a “value that will be known in some point in the future”.
The type Future[Future[A]] thus represents a “future value in some point
in the future”, which is still a value in future (with some known
intermediate step).
|
The proof of this statement is a function which takes
a Program2[Program2[A]]
and returns a Program2[A]
;
a function of “shape” F[F[A]] => F[A]
is usually called flatten
.
The implementation is trivial and is in fact the
only possible implementation:
def flatten[A](program: Program2[Program2[A]]): Program2[A] =
new Program2[A]:
def execute(): A =
val program1 = program.execute() // execute the "outer" program
program1.execute() // execute the just produced inner program
In words: to produce an A
from Program2[Program2[A]]
,
we must execute the outer Program2
to get (inner) Program2[A]
which we then execute to get desired A
.
It is now only an intelectual curiosity to produce a program producing
a program, but soon we will see how useful this is. Usually we don’t
write flatten(program.map(f))
, where program
is a Program2[A]
and f
is a function A => Program[B]
, but use a single
step called flatMap
:
trait Program2[A]:
def execute(): A
/** Program which first executes `this` program and passes the result
* as argument to `f` to obtain another program which is then executed
* and its result returned.
*/
def flatMap[B](f: A => Program2[B]): Program2[B] =
val self = this
new Program2[B]:
def execute(): B =
val a = self.execute() // execute the "outer" program
val program1 = f(a) // map the result to new program
program1.execute() // execute the just created program
Please note this is in fact the direct transformation of andThen
from
Program1
(with regard to generalization of Program1
to Program2[A]
):
just with Program2[A]
we can use the result of the first
program to create the second one; in other words, “bake” the result
of the first program into the second one as additional argument
(known only after first program has been executed and its result is known).
As this is a generic interface, not only we can do this,
but also should do so that generic interface doesn’t swallow potentially
useful piece of information. (A client can always ignore
a result of flatMap
ped program if they want.)
As with Program1
, it is good to ask what is the minimal
implementation of Program2[A]
. It is the program which returns
a value which we already know:
// factory for `Program2`
def pure[A](a: A): Program2[A] =
new Program2[A]: // captures the argument
def execute(): A = a // return the factory's argument
(See how pure[Unit](())
is the same as dummy
for Program1
.)
Now when we have flatMap
operation and the pure
factory,
we can implement map
operation in terms of these two:
the intuition is that flatMap
is a “stronger” operation.
The equivalence is
program.map(f) === program.flatMap { a => pure(f(a)) }
// where
// program: Program2[A]
// f: A => B
(Please reread the contracts of map
, flatMap
and pure
to confirm
this is not deceptive.)
I intentionally use the term equivalence
to avoid confusion with equality,
the left- and right-hand side expressions represent the same programs,
but are not equal as object instances.
Let us write an example program using our Program2
.
We take a well known recursive problem
(Collatz conjecture)
which goes this way: given a positive number n
,
if it is odd, mutliply it by 3 and add 1
(hence the name 3n + 1
problem) and repeat,
if it is even, divide by 2 and repeat.
The Collatz conjecture states that for every starting n
,
we will eventually reach the cycle 1, 4, 2, 1.
Let’s implement a program which, for given n
,
computes the number of steps until the sequence reaches 1.
(Example: for n
= 6, the sequence is
3, 10, 5, 16, 8, 4, 2, 1 and hence the result is 8).
def collatz(n: BigInt): Program2[BigInt] =
new Program2[BigInt]:
def execute(): BigInt =
if (n == 1)
0
else
val c = // program for next step
if (n % 2 == 0) collatz(n / 2)
else collatz(3 * n + 1)
c
.map(_ + 1) // add 1 for this step
.execute()
To check the result of this program, we introduce a program for printing length:
def printLength(n: BigInt): Program2[Unit] =
new Program2[Unit]:
def execute(): Unit =
println(s"Length: $n")
The program which, for given n
, computes the lenght of
3n + 1
sequence and prints its length, is
def collatzAndPrint(n: BigInt): Program2[Unit] =
collatz(n).flatMap(printLength)
When we execute
the program collatzAndPrint(6)
,
we will see Length: 8
as expected.
As you can verify, this implementation extensively uses stack
(you would need to find a very large n
to overflow the stack though).
Let summarize what we have in single code listing.
We delegate flatMap
method implementation to a free-standing function
in companion object which acts as namespace so that pure
and flatMap
are both factories for programs.
The map
operation is implemented in terms
of pure
and flatMap
; it is not treated as a “primitive” operation.
trait Program3[A]:
def execute(): A
/** Program which executes `this` and remaps its result with `f`. */
def map[B](f: A => B): Program3[B] =
// implement with `flatMap` and `pure`
flatMap { a => Program3.pure(f(a)) }
/** Program which executes `this`, applies `f` on the result to get
* a program which is then executed and its result returned.
*/
def flatMap[B](f: A => Program3[B]): Program3[B] =
// delegate to factory
Program3.flatMap(this, f)
object Program3:
/** Program which returns the value passed as argument. */
def pure[A](a: A): Program3[A] =
new Program3[A]:
def execute(): A = a
/** Program which executes `program` to get a value which is then passed
* to `f` as argument; the resulting program is then executed to obtain
* the result.
*/
def flatMap[A, B](
program: Program3[A],
f: A => Program3[B],
): Program3[B] =
new Program3[B]:
def execute(): B =
val a = program.execute()
val program1 = f(a)
program1.execute()
def flatten[A](program: Program2[Program2[A]]): Program2[A] =
// `identity` is the generic identity function
program.flatMap(identity) // that is `flatMap { program1 => program1 }`
We already have more than one might expect,
but the question if it is useful is still not answered.
The root problem is that we force Program3
to provide execute
method, and once we call it,
the program has no option but to do everything it can to produce
the output, blocking the thread and growing the stack during its execution.
It is nothing more than just a funny form of normal functions,
we only capture arguments into programs (usually anonymous)
and name the “function call” as execute
.
To see benefits of such programs, we must do one non-obvious step
(maybe surprising for those from OOP world):
remove the execute
method from the interface and thus relieve programs
from the concern of their execution.
Note we started with Program1
which had nothing but execute
,
and now we remove this execute
from the interface and
are searching for something which will drive programs execution:
the interpreter.
(I use the term interpreter for the entity responsible for “how”
to avoid confusion with threading task scheduling mechanisms
called executors which are responsible for “where”.)
So we remove execute
from Program3
and get (surprisingly small)
sealed trait Program[A]:
// no `execute()` here
def map[B](f: A => B): Program[B] =
flatMap { a => Program.pure(f(a)) }
def flatMap[B](f: A => Program[B]): Program[B] =
Program.FlatMap(this, f)
object Program:
/** Program which returns an already known value. */
private case class Pure[A](value: A) extends Program[A]
/** Program which (when interpreted) executes the `program`,
* applies `f` on its result to get another program,
* executes it and returns its result.
*/
private case class FlatMap[A, B](
program: Program[A],
f: A => Program[B],
) extends Program[B]
We now don’t have anything reasonable to do in pure
and flatMap
implementations, so we just save the arguments to named cases
of Program
in the hope that something (interpreter)
will use them when executing programs.
I use private
for the cases to denote that a client shall
use factories below to create Program
s, and not refer to
concrete implementations directly.
As before, we have the minimal implementation of Program[A]
:
def pure[A](a: A): Program[A] = Pure(a)
With what we already have, let’s prepare two other factories for Program
s:
def delay[A](f: () => A): Program[A] =
pure(()).map(_ => f())
This factory suspends the function f
into a Program[A]
,
when it is is executed,
the result of the suspended function is used as the result of the program.
def defer[A](f: () => Program[A]): Program[A] =
pure(()).flatMap(_ => f())
This factory suspends the function f
which this time produces
a program, not directly a value to return.
When the the program p = defer(f)
is executed, we first
run the function f
to obtain a program which is executed next
to obtain the result for p
.
Note
|
I intentionally use the same names as Cats Effect library which is one of Scala implementations of IO monad. But this text does not follow Cats Effect implementation, I’m presenting the ideas behind computation in IO monads, not writing a real-world library. If you want to try programming with IO, please use Cats Effect or ZIO. |
Please note that there is no requirement (and also no way to enforce)
that parameters of map
/ delay
and flatMap
/ defer
do not do any side effects when run;
on the contrary it is very common (especially in flatMap
/ defer
)
to do them.
Let’s write some interpreter for Program[A]
just to prove that
by removing execute
from the Program
interface we lose nothing:
def run1[A](program: Program[A]): A =
program match
case Pure(a) =>
a
case FlatMap(program1, f) => // `program1: Program[B]` for some `B`
val b = run1(program1) // not a tail call
val program2 = f(b)
run1(program2)
Note this is a trivial straightforward implementation,
driven by types. You can’t do anything else with a Program
passed as argument than pattern-match it.
If it is a Pure[A](a)
, the situation is trivial.
If it is a FlatMap[B, A](program1, f)
for some type B
,
a natural thing to do is to execute the program1
to get a B
,
apply f
to get program2: Program[A]
and execute it to get the
final return value of type A
.
Let’s test our Collatz program against this interpreter:
def collatz(n: BigInt): Program[BigInt] =
defer { () => // we must use a factory
if (n == 1)
pure(0)
else
if (n % 2 == 0) collatz(n / 2).map(_ + 1)
else collatz(3 * n + 1).map(_ + 1)
// no more need to execute program created above
}
def printLength(n: BigInt): Program[Unit] =
delay { () =>
println(s"Length: $n")
}
val program = collatz(6).flatMap(printLength)
run1(program)
outputs:
Length: 8
If you inspect stack usage, you will see that execution of collatz(6)
by run1
needs about 10 stack frames.
The reason is that during recursion we build a chain of Map
s:
the collatz(n)
gradually builds (from right) a program
pure(0).map(_ + 1)[...].map(_ + 1)
which execution consumes a stack.
Let’s fix the stack consumption.
It needs some experience to rewrite a non-tail
recursion to a tail-recursive one.
I applied my experience to run1
and admittedly,
it was quite a hard nut.
I finally ended up with
sealed trait Todo[A]
object Todo:
case class Done[A](value: A) extends Todo[A]
case class More[A, B](
program: Program[A],
todo: A => Todo[B],
) extends Todo[B]
@tailrec
def loop2[A, B](program: Program[A], todo: A => Todo[B]): B =
import Todo._
program match
case Pure(a) =>
todo(a) match {
case Done(b) => b
case More(program1, todo) => loop2(program1, todo)
}
case FlatMap(program, f) =>
loop2(
program,
x => More(f(x), todo),
)
def run2[A](program: Program[A]): A =
loop2(program, a => Todo.Done(a))
As it is usual, we delegate the implementation of function we want
to make tail-recursive to a tail-recursive helper, here called
loop2
.
The todo
parameter is an accumulator of work “to be done”.
If we run our collatz(n)
program against run2
(with some debug logging),
we will see that tail calls are really eliminated.
Using Program
with run2
prevents stack overflows,
so already we got something non-trivial.
Not so fast. We just moved to heap what previously was on stack,
the problem with chained Map
s is not solved, only we can run
the program without overflowing the stack. The problem with chaining
map(_ + 1)
is inheretent, and problems like the Collatz’s one are not
good use cases for IO monad (naive implementation stays naive,
IO is not a rescue).
Nevertheless, not all recursions are simple and the trade “stack for heap”
done by Program
may be handy.
Let’s write a program which computes if the length of singly linked list is odd or even. A “function” version of this would be
/** Returns true if the `list` length is even. */
def even[A](list: List[A]): Boolean =
list match
case head :: tail => odd(tail)
case Nil => true
/** Returns true if the `list` length is odd. */
def odd[A](list: List[A]): Boolean =
list match
case head :: tail => even(tail)
case Nil => false
val hasEvenLength = even(list)
// where
// list: List[Int]
For larger lists, this will overflow the stack; the recursion is not
of type f
−f
−f
⋯ but f
−g
−f
−⋯ and
the compiler will not optimise it.
Program
version will not overflow as the control is always passed back
to interpreter before advancing to next step:
def even[A](list: List[A]): Program[Boolean] = defer { () =>
list match
case head :: tail => odd(tail)
case Nil => pure(true)
}
def odd[A](list: List[A]): Program[Boolean] = defer { () =>
list match
case head :: tail => even(tail)
case Nil => pure(false)
}
val hasEvenLength = run2(even(list))
// where
// list: List[Int]
Unlike in collatz
example, there is no work “to be done later”,
and although we create a lot instances of Program
during the execution,
we don’t need more than two at the same time. Creating and
collecting short-lived objects should be very cheap on JVM;
and since many IO program follow this pattern where you create
a Program
for next step of computation and
immediately after its execution it is eligible for collection,
IO programs can be very fast.
The trick of not calling next step directly, but rather returning
a description what is to be done next is called trampolining,
and that’s what our run2
does. By representing a program
in code, we are a bit in the role of the programming language designer
or compiler implementor.
I hope it is clear how run2
implements a tail-recursive interpreter
of Program
s with the help of Todo
variants Done
and More
.
But if we look more closely, there is a striking resemblance:
Done
is like Pure
and More
is like FlatMap
!
So we can rewrite run2
to really short
@tailrec
def run[A](program: Program[A]): A =
program match
case Pure(a) =>
a
case FlatMap(program1, f) =>
// where for some type B:
// program1: Program[B]
// f: B => Program[A]
program1 match
case Pure(b) =>
val program2 = f(b)
run(program2)
case FlatMap(program2, g) =>
// where for some type C:
// program2: Program[C]
// g: C => Program[B]
val program3 = FlatMap(program2, c => FlatMap(g(c), f))
run(program3)
We could write this directly, but it might have felt like magic
which is something I wanted to avoid.
A bit of additional explanation:
In case FlatMap
, we inline the former loop2
into the case body,
and so pattern match on inner FlatMap
's program (program1
).
If this inner program is again a FlatMap
,
we move the function (g
)
“to the right”, removing one “onion peel” from the left,
so that we can make progress by execution of “smaller” program:
the equivalence here is (don’t be terrified):
FlatMap[C, A](
FlatMap[C, B](
program, // Program[C]
g, // C => Program[B]
),
f, // B => Program[A]
)
===
FlatMap[C, A](
program, // Program[C]
c => FlatMap( // C => FlatMap[B, A]
g(c), // Program[B]
f, // B => Program[A]
),
)
or written in dot-notation on instances:
program
.flatMap { c => g(c) }
.flatMap { b => f(b) }
===
program
.flatMap { c =>
g(c).flatMap { b =>
f(b)
}
}
// where
// program: Program[C]
// g: C => Program[B]
// f: B => Program[A]
This is sometimes called the associativy (monad) law since we can symbolically write equivalence
[ p >>= (c -> g(c)) ] >>= (b -> f(b)) ===
p >>= [ c -> ( g(c ) >>= (b -> f(b)) ) ]
// or shorter
(p >>= g) >>= f === p >>= [c -> (g(c) >>= f)]
where >>=
stands for flatMap
.
To get an idea what is possible to do with Program
s,
we will write a concurrent interpreter of multiple programs
running on single thread.
(Concurrency does not imply parallelism!)
If we look to tail-recursive run
from previous section,
we can see it in fact does two things (in FlatMap
case): it obtains a program
to be run next and runs it.
We can extract the logic to create next program as
def run[A](program: Program[A]): Either[A, Program[A]] =
program match
case Pure(a) =>
Left(a)
case FlatMap(program1, f) =>
program1 match
case Pure(b) =>
val program2 = f(b)
Right(program2)
case FlatMap(program2, g) =>
val program3 = FlatMap(program2, c => FlatMap(g(c), f))
Right(program3)
Note
|
Either[A, B] is a sealed trait of two variants:
Left[A] and Right[B] . It used to represent disjoin union
of types A and B .
|
Instead of running the “next” program (tail-)recursively, we return either program’s result on the left or the “next” program on the right.
We can now write an interpreter that runs multiple programs concurrently:
def run(programs: Seq[Program[Unit]]): Unit =
// create mutable `queue` initiated with programs to start with
val queue = mutable.Queue[Program[Unit]](programs: _*)
@tailrec
def loop(): Unit =
if (queue.isEmpty)
() // nothing to do
else
val program = queue.dequeue()
run(program) match
case Left(_) => // ignore the result
case Right(next) =>
queue += next // append the program's continuation to work queue
loop()
loop()
I believe this is self-explanatory: instead of running only single program tail-recursively, we maintain a queue of programs, and in each step, the “next” program is appended to the end.
Let’s try it:
def countDown(label: String, n: Int): Program[Unit] =
if (n == 0)
pure(())
else
defer { () =>
println(s"$label (n = $n)")
countDown(label, n - 1)
}
val programs =
Queue(
countDown("Hello", 3),
countDown("¡Hola!", 2),
countDown("Ahoj", 1),
)
run(programs)
The countDown
program n
-times prints label
and then ends.
We run three such programs,
printing Hello in English (3×), Spanish (2×)
and Czech (1×).
The program executions are interleaved:
Hello (n = 3) ¡Hola! (n = 2) Ahoj (n = 1) Hello (n = 2) ¡Hola! (n = 1) Hello (n = 1)
Of course run
is not an reasonable API; IO applications use to have
a single entry point. This was only a prelude to next section
where we show Node.js-like runtime with a single user’s thread.
Node.js brought from JavaScript browser world an interesting thing to mainstream programming: all user code runs on a single thread. (Hence it needs no data synchronization!) Users can nevertheless spawn multiple asynchronous operations from their code, they just need to provide callbacks what to do when operations complete:
fetch(...).then(response => {
// handle response
})
When the Node.js interpreter encounters an asynchronous operation
(like fetch
), it saves the callback (response => {...}
),
runs the operation in background and when its result is ready,
it runs the callback, providing the result as argument.
In the meantime it of course other callbacks with results
of other background operations; this runtime model may be a perfect
choice for programs that do lot of IO, but no CPU-heavy computation
which would block the single, precious user’s thread.
Note
|
There is a syntax sugar to avoid deep nesting of callbacks
(await / async ), but conceptually I don’t lie.
The more
complicated details of Node.js event loop are not important
for purposes of this text.
|
Let’s bring the idea to our Program
s. As in previous example,
we will maintain a queue of programs to be run; but this time
we have to give users an option to request an asynchronous operation.
Pure
and FlatMap
are too weak as they model synchronous operations only;
we need a new kind of program, something like asynchronous delay
:
// hypothetical asynchronous `delay`, not a valid Scala though
def async[A](f: async () => A): Program[A]
The async
word “indicates” that f
can spawn asynchronous operation
(in the background; like Future[A]
) and the interpreter should use
its output as result of the async
program.
It takes some time to come up with some working user’s interface
for async Program
factory for which we will have implementation
in interpreter:
// factory for asynchronous `delay`
def async[A](k: (A => Unit) => Unit): Program[A] = ???
The k
's type (A => Unit) => Unit
looks scary, but it’s not so bad:
async
accepts a function that takes as argument
a callback cb: A => Unit
and does some effect.
The contract with a client of async
is that k
should start asynchronous operation which will produce an A
and inside its own callback
(which must be provided by library where the operation comes from)
must call cb
with the result of that asynchronous operation.
The important thing here is that cb
is not created by user;
it is supplied by interpreter as argument for their k
when async
program gets executed.
To cast some light on this, as example we take the asynchronous HTTP client from Java standard library:
/** Returns status code for single HTTP GET to given URI. */
def singleHttpRequest(uri: String): Program[Int] =
async[Int] { cb =>
val client = HttpClient.newBuilder().build() // make an HTTP client
val request = HttpRequest // build a GET request to URI
.newBuilder
.uri(URI.create(uri))
.timeout(Duration.ofSeconds(10))
.GET
.build()
client
.sendAsync(request, BodyHandlers.ofString) // CompletableFuture[...]
.thenAccept { response => // Java `CompletableFuture`'s callback
cb(response.statusCode) // call `cb` with the result of async operation
}
}
The async
program is a block code which accepts cb
as argument,
spawns asynchronous operation and uses cb
to let the program interpreter
know about the result
(obviously by doing some side effect into interpreter state).
Extending the run
from previous section is now direct:
we use now well-known trick to save the argument of factory
to case of Program[A]
:
// new case of `Program[A]` representing asynchronous operations
case class Async[A](k: (A => Unit) => Unit) extends Program[A])
// the factory for it
def async[A](k: (A => Unit) => Unit): Program[A] =
Async(k)
When we run the the queue of programs, we now must handle the case when a client provides an asynchronous operation:
def run[A](program: Program[A], enqueue: Program[A] => Unit): Unit =
program match
case Pure(a) =>
()
case FlatMap(program1, f) =>
program1 match
case Pure(b) =>
val program2 = f(b)
enqueue(program2)
case FlatMap(program2, g) =>
val program3 = FlatMap(program2, c => FlatMap(g(c), f))
enqueue(program3)
case Async(k) =>
k { b => enqueue(f(b)) }
case Async(k) =>
// note this is an async version of `case Pure`
k { a => enqueue(pure(a)) }
This time the queue is something mutable and protected against concurrent
access so that we can append to it from different threads.
The own interpreter is just a consumer loop that takes Program
s
from the queue and run
s it with enqueue
being a function
to append to the queue.
This way client’s code in k
enqueues the result of async operation
to the interpreter loop without knowing about it.
This is how an implementation may look like:
def run(programs: Seq[Program[Unit]]): Nothing =
// add `programs` to blocking queue
val queue: BlockingQueue[Program[Unit]] = LinkedBlockingDeque()
import scala.jdk.CollectionConverters._
queue.addAll(programs.asJava)
@tailrec
def loop(): Nothing =
// `take()` blocks the thread until there is a `Program` to take
val program = queue.take()
run(program, queue.add)
loop()
loop()
This has a problem that the run
never ends even if there is nothing
to do; we will provide much more mature IO implementation later.
The cost of executing k
with the result of async operation
is minimal (just appending to queue),
so we don’t do anything heavy on thread (pool) where the library
providing async operations executes the callbacks.
To demonstrate our small Node.js-like interpreter, we write a program which 5× GETs two domains and prints the request status codes:
def fetch(n: Int, uri: String): Program[Unit] =
if (n == 0)
pure(())
else
singleHttpRequest(uri).flatMap { code =>
println(s"fetched $uri: $code")
fetch(n - 1, uri)
}
val programs = Seq(
fetch(5, "https://example.com"),
fetch(5, "https://google.com"),
)
run(programs)
Possible output:
fetched https://google.com: 301 fetched https://google.com: 301 fetched https://example.com: 200 fetched https://google.com: 301 fetched https://google.com: 301 fetched https://google.com: 301 fetched https://example.com: 200 fetched https://example.com: 200 fetched https://example.com: 200 fetched https://example.com: 200
As more generic example, we will show how to use Future[A]
as Program[A]
.
The first attempt may look like this:
// don't do this
def naiveFromFuture[A](
future: Future[A],
)(using ec: ExecutionContext): Program[A] =
async { cb =>
future.onComplete {
case Success(a) => cb(a)
case Failure(e) => ???
}
}
Note
|
To call onComplete , we are required to provide an implicit instance
of ExecutionContext where onComplete callback will get executed.
The inability to handle the Failure case will be discussed later,
until now we generally assume we are free or errors.
|
Although this looks reasonable, there is a flaw:
the resulting program has strange semantics!
The Future[A]
represents an already running operation,
so when save the program naiveFromFuture(future)
to variable
and execute it multiple times,
it will always complete with the same result
as it will be filled with the result of the same future
.
To fix this, we need to make the argument for “from future” lazy,
so that new Future[A]
is created (and an async operation spawned)
each time the program is executed:
def fromFuture[A](
program: Program[Future[A]],
)(using ec: ExecutionContext): Program[A] =
program.flatMap(naiveFromFuture)
Instead of accepting a Future[A]
representing spawned
async operation, we accept a Program[Future[A]]
,
which is only a description of it
(possibly created with delay
).
When we execute fromFuture(program)
, we first create the future
and then we pass it in flatMap
to naiveFromFuture
.
Until this moment we didn’t count with errors we may encounter during the program execution – and if we work with languages that use exceptions, we must assume that sooner or later we will get some, especially if we call code not under our control. (It may be everthing from logic error to HTTP request timeout failure.)
In our synchronous interpreter, an exception during program
execution would fail the interpreter loop.
In asynchronous interpreter from previous section,
using blocking queue, an error in onComplete
will
be maybe logged in ExecutorContext
's error handler and swallowed;
and the program execution would hang since the exception
would not get propagated.
(Try to set timeout in singleHtppRequest
to something small
and run the example!)
If we give to client an API accepting functions (in delay
, flatMap
, etc.)
which will do side effects, we must expect they may fail:
what should we do with following?
def f(): Unit =
throw RuntimeException
val program = delay { () => f() }
When we run the program
, we have to run the function f
suspended inside and so hit the exception. Forcing user to
pass only functions which will not throw is clumsy and fragile;
at least in Scala we don’t have a way to ensure that a function
will not throw so that if a client forgets to catch exceptions
before returning control to interpreter, all bets are lost.
This leads us to one more case of programs: those that are “raised” (errored) and instead of returning a value they carry the (exceptional) cause for failure:
case class Error[A](cause: Throwable) extends Program[A]
// factory for this case, if user wants to use it explicitely
def raiseError(cause: Throwable): Program[A] =
Error(cause)
When executing a program, we try
-catch
for potentional errors
in interpreter and translate failures to raised programs.
The map
ping and flatMap
ping on raised programs
does nothing; the same as all code after throw
is skipped
until we catch the exception (or let the application fail with it).
Non-generic Error
case extending Program[A]
is not the only option how we can model errors;
we implicitly assume (in JVM tradition) that errors
are derived from Throwable
.
The ZIO library models programs by type ZIO[R, E, A]
where:
-
R
is the runtime (context) type for program, -
E
is the (possible) error type, -
A
is the value type.
But what shall we do if a client breaks the contract
and throws an exception of type E1
in factory which returns Z[R, E2, A]
(assuming E1
is
not a subtype of E2
)?
The authors of ZIO recommend to start (for those who are new to ZIO)
with ZIO[Any, Throwable, A]
(aliased Task[A]
)
which correspond to our Program[A]
and most closely to Future[A]
.
Note
|
To those who have never seen it, using Any in ZIO[Any, Throwable, A]
may look suspicious (Any , “anything”, is a Scala’s supertype for
all types; like Java’s Object ). That the runtime for program
may be “anything” is very generic, but also means that you have
no useful information about it to use.
Hence ZIO[Any, ?, ?] means there is some runtime for the program,
but the proram itself can’t use it
(it can be a Unit which does not carry any useful stuff).
|
Back to our own Program[A]
. I think it is sufficiently close
to Cats Effect's one that we can
rename it to IO
(as Cats Effect does) and show its
more mature version:
That’s what a user can do if they have an existing
instance of IO[A]
.
Please refer to IO.scala
for implementations of methods;
the trait is not abstract.
sealed trait IO[+A]:
/** Transforms the result of `this` with `f`. */
def map[B](f: A => B): IO[B]
/** Ignores the result of `this` and replaces it with `b`. */
def as[B](b: B): IO[B]
/** Uses the result of this `f` to create a program to be run next. */
def flatMap[B](f: A => IO[B]): IO[B]
/** Ignores the result of `this` and register `that` to be run next. */
def *>[B](that: IO[B]): IO[B]
/** If `this` program fails, `attempt` will recover it back to normal,
* and which case happened is communicated in the result:
* either `this` failed and the cause is put on the left,
* or succeeded and the value is put on the right.
*/
def attempt: IO[Either[Throwable, A]]
/** Program which executes `this` and `that` in "parallel". */
def par[B](that: IO[B]): IO[(A, B)]
/** Yields from execution of `this`. */
def execYield: IO[A]
Please note we have added attempt
operation so that user can
recover from failures,
par
so that it is possible to run multiple programs in parralel
and execYield
(yield
is a keyword) to yield execution.
These are factories users of IO can use to get instances of IO
and so call methods like map
and flatMap
:
object IO:
def pure[A](a: A): IO[A]
val unit = pure(())
def raiseError[A](cause: Throwable): IO[A]
def delay[A](f: () => A): IO[A]
def defer[A](f: a => IO[A]): IO[A]
def async[A](k: (A => Unit) => Unit): IO[A]
Note that mature IO library will provide for example a factory for non-blocking sleep out of the box, but we can implement our own in user code like this:
// use scheduler from Java standard library
val executorService = Executors.newSingleThreadScheduledExecutor
def delay(duration: Duration): IO[Unit] =
IO.async { cb =>
val command: Runnable = () => cb(Right(()))
executorService.schedule(
command,
duration.toMillis,
TimeUnit.MILLISECONDS,
)
}
Please refer to IO.scala
in this repository for an example interpreter.
The module contains the definition of IO
trait, its cases,
and a single threaded interpreter; the file has less than 250 lines
which is IMO very little concerning what it supports:
-
way to plug-in asynchronous operations with propagation of errors (
IO.async
factory), -
“parallel” execution of subprograms – if subprograms contain async steps, we have means to run those async steps in parallel (
io1.par(io2)
), -
recovering from errors (
io.attempt
), -
yielding, so users have control on execution and can yield from executing current program to give interpreter the opportunity to execute programs waiting in queue, e.g. those after an async step (
io.execYield
).
We use tricks introduced in previous sections to implement such “miracle”.
I myself didn’t look into implementation used in Cats Effects or ZIO,
but I think that the IO.scala
module is really worth reading.
Before few years, I would not imagine something so powerful
can be implemented in one self-contained type-safe module.
(It is not meant for production though, please use CE of ZIO.)
It is not obvious how to define a resource, it is much easier to say how a program using resource looks like:
// `R` for resource
// `A` for the output of program using resource
def programUsingResource[R, A](
acquire: IO[R],
use: R => IO[A],
release: A => IO[Unit],
): IO[A]
where
-
acquire
is a program to acquire the resource. -
use
is a program factory which, given the acquired resource, produces a program which uses it. It runs only ifacquire
succeeded. (Note this is guaranteed by types: until we have anA
fromacquire
, we don’t have argument foruse
.) -
release
is a program for releasing the resource; it runs only ifacquire
succeeded before (again, “if” part guaranteed by types) and even if its usage failed (we must take care of this).
The implementation can be very simple:
def programUsingResource[R, A](
acquire: IO[R],
use: R => IO[A],
release: R => IO[Unit],
): IO[A] =
acquire // acquire the resource
.flatMap { r => // continue only if we have acquired the resource
use(r) // create a program using the acquired resource
.attempt // catch potential failures in `use(r)`
.flatMap {
case Left(e) => // `use(r)` failed
release(r) // release the resource
.attempt // swallow potential errors while releasing
*> raiseError(e) // re-raise the error
case Right(a) => // `use(r)` produces a value `a`
release(r) // release the resource (don't swallow potential errors)
.as(a) // return the value `a`
}
}
This is very similar to RAII known from C++ and Rust, but this time, the concept of constructor and destructor is implemented in user-space code and works as well for synchronous programs as for asynchronous ones. We have implemented a runtime representation of RAII scope; and the concept of resource acquire-release is not tied to a specific structure’s constructor & destructor.
The Cats Effect library provides Resource
class which provides nicer
syntax and captures the concept more deeply:
our intuition says resources should compose well,
it should be possible to acquire them sequentially or in parallel;
there are similar in some deeper sense with IO type itself.
But this is out of scope of this text.
IMO even our simple
programUsingResource
witnesses the strenght of IO concept:
the resource handling in presence of erorrs is something
very important and difficult, but with IO, we get a robust
solution for concurrency environment virtually for free.
There is more to be built on ideas presented here, but it is out of scope of this Gradus ad IO Parnassum text. Perhaps the most interesting for me is building of effectful, concurrent streams on top of IO, see fs2 library. The word effectful means that we can run IO operations (for side effect) as stream stages. If I get some positive feedback on this, I may write similar introductory text to show the ideas behind IO-powered streams.