From 7d1034acf2750519c3eac05cf6f71ef19f4d9ba0 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Thu, 10 Oct 2024 19:42:48 -0500 Subject: [PATCH 01/22] Add 'stream' and 'future' types --- design/mvp/Async.md | 152 +++- design/mvp/Binary.md | 17 +- design/mvp/CanonicalABI.md | 863 +++++++++++++++++++--- design/mvp/Explainer.md | 111 ++- design/mvp/canonical-abi/definitions.py | 468 ++++++++++-- design/mvp/canonical-abi/run_tests.py | 934 ++++++++++++++++++++++-- 6 files changed, 2313 insertions(+), 232 deletions(-) diff --git a/design/mvp/Async.md b/design/mvp/Async.md index 2a44f8c5..4b7e1b73 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -17,6 +17,7 @@ summary of the motivation and animated sketch of the design in action. * [Current task](#current-task) * [Subtask and Supertask](#subtask-and-supertask) * [Structured concurrency](#structured-concurrency) + * [Streams and Futures](#streams-and-futures) * [Waiting](#waiting) * [Backpressure](#backpressure) * [Returning](#returning) @@ -106,8 +107,30 @@ Thus, backpressure combined with the partitioning of low-level state provided by the Component Model enables sync and async code to interoperate while preserving the expectations of both. -[TODO](#todo): `future` and `stream` types that can be used in function -signatures will be added next. +In addition to being able to define and call whole functions asynchronously, +the `stream` and `future` types can be used in function signatures to pass +parameters and results incrementally over time, achieving finer-grained +concurrency. Streams and futures are thus not defined to be free-standing +resources with their own internal memory buffers (like a traditional channel or +pipe) but, rather, more-primitive control-flow mechanisms that synchronize the +incremental passing of parameters and results during cross-component calls. +Higher-level resources like channels and pipes can then be defined in terms +of these lower-level `stream` and `future` primitives, e.g.: +```wit +resource pipe { + constructor(buffer-size: u32); + write: func(bytes: stream) -> result; + read: func() -> stream; +} +``` +but also many other domain-specific concurrent resources like WASI HTTP request +and response bodies or WASI blobs. Streams and futures are however high-level +enough to be bound automatically to many source languages' built-in concurrency +features like futures, promises, streams, generators and iterators, unlike +lower-level concurrency primitives (like callbacks or `wasi:io@0.2.0` +`pollable`s). Thus, the Component Model seeks to provide the lowest-level +fine-grained concurrency primitives that are high-level and idiomatic enough to +enable automatic generation of usable language-integrated bindings. ## Concepts @@ -180,18 +203,80 @@ invocation of an export by the host. Moreover, at any one point in time, the set of tasks active in a linked component graph form a forest of async call trees which e.g., can be visualized using a traditional flamegraph. -The Canonical ABI's Python code enforces Structured Concurrency by maintaining -a simple per-[`Task`] `num_async_subtasks` counter that traps if not zero when -the `Task` finishes. +The Canonical ABI's Python code enforces Structured Concurrency by incrementing +a per-[`Task`] counter when a `Subtask` is created, decrementing when a +`Subtask` is destroyed, and trapping if the counter is not zero when the `Task` +attempts to exit. + +### Streams and Futures + +Streams and Futures have two "ends": a *readable end* and *writable end*. When +*consuming* a `stream` or `future` value as a parameter (of an export call +with a `stream` or `future` somewhere in the parameter types) or result (of an +import call with a `stream` or `future` somewhere in the result type), the +receiver always gets *unique ownership* of the *readable end* of the `stream` +or `future`. When *producing* a `stream` or `future` value as a parameter (of +an import call) or result (of an export call), the producer can either +*transfer ownership* of a readable end it has already received or it can +create a fresh writable end (via `stream.new` or `future.new`) and lift this +writable end (maintaining ownership of the writable end, but creating a fresh +readable end for the receiver). To maintain the invariant that readable ends +are unique, a writable end can be lifted at most once, trapping otherwise. + +Based on this, `stream` and `future` values can be passed between +functions as if they were synchronous `list` and `T` values, resp. For +example, given `f` and `g` with types: +```wit +f: func(x: whatever) -> stream; +g: func(s: stream) -> stuff; +``` +`g(f(x))` works as you might hope, concurrently streaming `x` into `f` which +concurrently streams its results into `g`. (The addition of [`error`](#TODO) +will provide a generic answer to the question of what happens if `f` +experiences an error: `f` can close its returned writable stream end with an +`error` that will be propagated into `g` which should then propagate the error +somehow into `stuff`.) + +If a component instance *would* receive the readable end of a stream for which +it already owns the writable end, the readable end disappears and the existing +writable end is received instead (since the guest can now handle the whole +stream more efficiently wholly from within guest code). E.g., if the same +component instance defined `f` and `g` above, the composition `g(f(x))` would +just instruct the guest to stream directly from `f` into `g` without crossing a +component boundary or performing any extra copies. Thus, strengthening the +previously-mentioned invariant, the readable and writable ends of a stream are +unique *and never in the same component*. + +Given the readable or writable end of a stream, core wasm code can call the +imported `stream.read` or `stream.write` canonical built-ins, passing the +pointer and length of a linear-memory buffer to write-into or read-from, resp. +These built-ins can either return immediately if >0 elements were able to be +written or read immediately (without blocking) or return a sentinel "blocked" +value indicating that the read or write will execute concurrently. The +readable and writable ends of streams and futures each have a well-defined +parent `Task` that will receive "progress" events on all child streams/futures +that have previously blocked. + +From a [structured-concurrency](#structured-concurrency) perspective, the +readable and writable ends of streams and futures are leaves of the async call +tree. Unlike subtasks, the parent of the readable ends of streams and future +*can* change over time (when transferred via function call, as mentioned +above). However, there is always *some* parent `Task` and this parent `Task` +is prevented from orphaning its children using the same reference-counting +guard mentioned above for subtasks. ### Waiting When a component asynchronously lowers an import, it is explicitly requesting that, if the import blocks, control flow be returned back to the calling task -so that it can do something else. Eventually though a task may run out of other +so that it can do something else. Similarly, if `stream.read` or `stream.write` +would block, they return a "blocked" code so that the caller can continue to +make progress on other things. But eventually, a task will run out of other things to do and will need to **wait** for progress on one of the task's -subtasks. While a task is waiting, the runtime can switch to other running -tasks or start new tasks by invoking exports. +subtasks, readable stream ends, writable stream ends, readable future ends or +writable future ends, which are collectively called its **waitables**. While a +task is waiting on its waitables, the Component Model runtime can switch to +other running tasks or start new tasks by invoking exports. The Canonical ABI provides two ways for a task to wait: * The task can call the [`task.wait`] built-in to synchronously wait for @@ -234,13 +319,23 @@ the "started" state. ### Returning -The way an async Core WebAssembly function returns its value is by calling -[`task.return`], passing the core values that are to be lifted. - -The main reason to have `task.return` is so that a task can continue execution -after returning its value. This is useful for various finalization tasks (such -as logging, billing or metrics) that don't need to be on the critical path of -returning a value to the caller. +The way an async function returns its value is by calling [`task.return`], +passing the core values that are to be lifted as *parameters*. Additionally, +when the `always-task-return` `canonopt` is set, synchronous functions also +return their values by calling `task.return` (as a more expressive and +general alternative to `post-return`). + +Returning values by calling `task.return` allows a task to continue executing +even after it has passed its initial results to the caller. This can be useful +for various finalization tasks (freeing memory or performing logging, billing +or metrics operations) that don't need to be on the critical path of returning +a value to the caller, but the major use of executing code after `task.return` +is to continue to read and write from streams and futures. For example, a +stream transformer function of type `func(in: stream) -> stream` will +immediately `task.return` a stream created via `stream.new` and then sit in a +loop interleaving `stream.read`s (of the readable end passed for `in`) and +`stream.write`s (of the writable end it `stream.new`ed) before exiting the +task. A task may not call `task.return` unless it is in the "started" state. Once `task.return` is called, the task is in the "returned" state. A task can only @@ -419,21 +514,24 @@ For now, this remains a [TODO](#todo) and validation will reject `async`-lifted ## TODO -Native async support is being proposed in progressive chunks. The following -features will be added in future chunks to complete "async" in Preview 3: -* `future`/`stream`/`error`: add for use in function types for finer-grained - concurrency -* `subtask.cancel`: allow a supertask to signal to a subtask that its result is - no longer wanted and to please wrap it up promptly -* allow "tail-calling" a subtask so that the current wasm instance can be torn - down eagerly -* `task.index`+`task.wake`: allow tasks in the same instance to wait on and - wake each other (async condvar-style) +Native async support is being proposed incrementally. The following features +will be added in future chunks roughly in the order list to complete the full +"async" story: +* add `error` type that can be included when closing a stream/future * `nonblocking` function type attribute: allow a function to declare in its type that it will not transitively do anything blocking +* define what `async` means for `start` functions (top-level await + background + tasks), along with cross-task coordination built-ins +* `subtask.cancel`: allow a supertask to signal to a subtask that its result is + no longer wanted and to please wrap it up promptly +* zero-copy forwarding/splicing and built-in way to "tail-call" a subtask so + that the current wasm instance can be torn down eagerly while preserving + structured concurrency +* some way to say "no more elements are coming for a while" * `recursive` function type attribute: allow a function to be reentered - recursively (instead of trapping) -* enable `async` `start` functions + recursively (instead of trapping) and link inner and outer activations +* allow pipelining multiple `stream.read`/`write` calls +* allow chaining multiple async calls together ("promise pipelining") * integrate with `shared`: define how to lift and lower functions `async` *and* `shared` diff --git a/design/mvp/Binary.md b/design/mvp/Binary.md index cbac87fa..565272bc 100644 --- a/design/mvp/Binary.md +++ b/design/mvp/Binary.md @@ -202,6 +202,8 @@ defvaltype ::= pvt: => pvt | 0x6a t?:? u?:? => (result t? (error u)?) | 0x69 i: => (own i) | 0x68 i: => (borrow i) + | 0x66 i: => (stream i) + | 0x65 i: => (future i) labelvaltype ::= l: t: => l t case ::= l: t?:? 0x00 => (case l t?) label' ::= len: l: