First experimental release
Welcome to the first release of gears
, the experimental cross-platform asynchronous programming library for Scala 3!
In this first release, we introduce concepts of asynchronous programming, both low-level/unstructured (sources, listeners, channels) and concepts for high-level structured concurrency. These are the first iteration of a complete design, and might have some holes in usability and performance. Feedback is appreciated!
A tutorial about the basic concepts will be available soon.
What is included?
User-facing, high level asynchronous programming
gears.async
introduces the concept of anAsync
context/capability: functions that implicitly take anAsync
context (using Async
) are suspendable computations and are capable of both performing asynchronous operations and spawning more concurrent asynchronous computations.
This is the recommended way to write asynchronous code: Functions that explicitly need to suspend or spawn concurrent computations should take anAsync
context:However, higher-order functions that do not explicitly use these capabilities do not have to take andef performAsyncIO(using Async): Int = ???
Async
context.val result: Seq[Int] = (1 to 5).map(_ => performAsyncIO) // ^^^ map is the regular Seq.map implementation!
- Spawning concurrent computations: Computations that are run concurrent with the caller can be spawned by invoking
Future.apply
with a body:Different from the previous example, this creates 5 concurrently (possibly parallelly) running computations, with aval resultFuts: Seq[Future[Int]] = (1 to 5).map(_ => Future(performAsyncIO))
Future
type that you canawait
for.await
ing effectively suspends the current computation until the result of the awaited computation is available.This awaits for the results in order. Note thatval resultsTry: List[Try[Int]] = resultFuts.map(_.awaitResult)
.awaitResult
requires anAsync
context. It returns aTry[Int]
, since the computation inside the Future may throw or be interrupted. To bypass this and directly get the result (rethrowing on Failure), use.await
.val results: List[Int] = resultFuts.map(_.await)
- Working with Futures: some library functions are provided to make working with futures more convenient:
Seq[Future[T]].awaitAll
waits for all futures to complete and returnSeq[T]
as a result, throwing when the first failure appears.Seq[Future[T]].altAll
waits for the first future to succeed, returning its value. If all fails, return the last failure.
Both variants provide aWithCancel
alternative for owned futures, where if the wait is short-circuited, then other futures are optimistically cancelled.Async.select
allows you to race the futures, and continue the computation based on which value was received:
val fFut = Future(f()) val gFut = Future(g()) val results = Async.select( fFut.andThen: x => ???, // handle f result ... gFut.andThen: y => ??? // handle g result ... )
- Structured Concurrency: Every
Async
context carries a completion group, tracking all concurrently running cancellable computations in a tree-like structure (groups can contain other groups). A group can be manually spawned byAsync.group
, which is automatically linked to the group present in the currentAsync
context:Upon the return of the main body, all running concurrent computations areval compute = Async.group: val f1 = Future(f()) val g1 = Future(g()) Seq(f1, g1).altAll
cancel
led (by calling.cancel()
and awaited for. This guarantees that once outsideAsync.group
, no concurrent computations inside that group can still be running, and we maintain the tree-like structure.
Future.apply
automatically creates a new group for its running body, so when you decide to compute something in parallel, you completely control its lifetime, including all of the concurrent computations that it spawned!
When the main body of the group completes, all unneeded/unawaited running computations are cancelled and awaited for clean up. - Going in-and-out of
Async
:Async.blocking
creates anAsync
context out of thin air (given a suspension implementation and a scheduler)! ThisAsync
context blocks the running thread for suspension, which is typically not what you want. However, it is useful in two scenarios:- As the root
Async
context: you would put this under themain
function of the application. - As a truly blocking call to an asynchrous operation: not recommended, but this works similar to Node.js
...Sync
variants,
where the thread is blocked until the operation completes, returning the results directly.
- As the root
- Cross Platform:
gears
is implemented generically, only assuming a suspension interface and a scheduler to work (seeAsyncSupport
). However, two default implementations are provided:- Using JVM >=21's virtual threads, alongside the usual JVM virtual thread scheduler
- Using Scala Native 0.5 delimited continuations, with the
ForkJoinPool
scheduler
Both can be provided by importinggears.async.default.given
, which automatically selects the correct implementation for the platform.
Mid-level, unstructured asynchronous operations
- Sources are the abstractions of asynchronous resources that an
Async
context can.awaitResult
for. From the high-level interface, we have been usingFuture[T]
, which is actually aSource[Try[T]]
that has a special property: once completed,Future[T]
always return the same result. - Listeners are the primary out-of-band way to receive values from a
Source
.
The most simple implementation of a listener can be done byListener.apply
, which takes the itemT
and the originSource[T]
and do something with it! Listeners always receive at most one item from theSource
, and are removed from theSource
's list once completed.
Listener bodies are run on the same computation that resolves the sources, so usage of them needs to be carefully tuned for a high degree of concurrency performance.- Locking listeners adds synchronization capabilities to a listener, to allow it to listen to multiple sources and decide whether it would be open to accepting the item or it is already expired. To know more, check out the
Listener
interface.
- Locking listeners adds synchronization capabilities to a listener, to allow it to listen to multiple sources and decide whether it would be open to accepting the item or it is already expired. To know more, check out the
- Channels are bidirectional channels that can be use as a communication mean between concurrent processes. They provide
.send(x: T)(using Async)
and.read()(using Async)
as suspending methods, but also.readSource
and.sendSource(x: T)
sources that reads/sends an item when a listener is attached and accepts the item/event.
Three types of channels are provided:SyncChannels
are unbuffered synchronous (rendezvous) channels where sends and reads block until the transfer is actually made.BufferedChannels
allow sends to be buffered up to a certain buffer count, completing instantly if buffer space is available.UnboundedChannels
allow sends to be always buffered with a growable buffer, returning instantly in all cases. It exposes a.sendImmediately
method that allows sending without anAsync
context.
race
,select
and channel synchronization:gears
provide exclusivity when using the aboveAsync.select
method with sources: exactly one of the given sources given will be resolved by theselect
(and no other item/event will be consumed from other sources). This is especially important for channels, as you typically would not want items from a channel being raced and thrown away:Async.blocking: val c1 = UnboundedChannel[Int]() val c2 = UnboundedChannel[Int]() c1.sendImmediately(1) c2.sendImmediately(2) def getInt = Async.select( c1.readSource.andThen(_ + 1), c2.readSource.andThen(_ + 2), ) val ints = Seq(getInt, getInt) // Seq(1, 2) or Seq(2, 1)
race
has a similar exclusivity semantics toselect
, but expects a list ofSource[T]
as parameters and return aSource[T]
, resolving with an item when one of the inputs produces one. This allows you to composeSource
s by racing them losslessly:val c1 = UnboundedChannel[Int]() val c2 = UnboundedChannel[Int]() val c3 = UnboundedChannel[Int]() c1.sendImmediately(1) c2.sendImmediately(2) val cs12: Source[Int] = Async.race(c1.readSource, c2.readSource) val cs123: Source[Int] = Async.race(cs12, c3.readSource) val ints = Async.blocking { Seq(cs12.awaitResult, cs123.awaitResult) } // Seq(1, 2) or Seq(2, 1)
- Creating sources:
Source
is a trait that can be manually implemented, but some Source creation methods are available:- With
Promise
andFuture.withResolver
: Promise is a wrapper for a Future that gets externally resolved. It exposes a.complete
method which can be called externally to resolve the future and continue all awaiting computations.
Future.withResolver
exposes the same mechanics, but in a more familiar interface to Node.js'sPromise
constructor, while also allowing the caller to register a method that cleans up the external process when theFuture
is cancelled. - With
Source.values
: creates a very simple queue-like source that resolves items to listeners as they come. - Source transformation API:
Source[T]
exposes.transformValuesWith
which applies a transformation function to every item in the original source before passing on a listener. This can be used to transform sources, but is not recommended, as the transformation will be run on the same computation as the original source and might cause unexpected performance problems.
- With
Low-level suspension API and scheduling
gears
's Async
context requires the machinery of a Suspension
API, which resembles "delimited continuations" to work. While default implementations for JVM 21+ and Scala Native 0.5+ exists, it is entirely replaceable and custom implementations (for example, to provide compatibility with JVM <= 20 with custom fibers) can be provided to create an Async
context through Async.blocking
.
A Scheduler
is also needed. By default, the default JVM virtual thread scheduler is used on JVM and ForkJoinPool
is used on Scala Native.
Some "primitive" operations, such as sleep
, are required from the scheduler/suspension implementation through the AsyncOperations
interface. Again, default implementations are provided, but this can be customized.
What's coming up next?
- A strawman interface for cross platform IO: A separate library
gears-io
is in the early stage of design, which will provide a common asynchronous I/O (file, sockets) interface using Gear'sAsync
context functions. We hope to enable cross-platform performant asynchronous I/O through this interface. - More refinement to the API and support for structured concurrency
- Your input! We would appreciate any effort to start using the library, build something cool and report the experience!