Skip to content

Commit

Permalink
Add forward
Browse files Browse the repository at this point in the history
  • Loading branch information
badeend committed Feb 14, 2024
1 parent 324be89 commit cbc5f40
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 2 deletions.
125 changes: 123 additions & 2 deletions imports.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,16 @@ use the <code>subscribe</code> function to obtain a <a href="#pollable"><code>po
for using <code>wasi:io/poll</code>.</p>
<h4><a name="output_stream"><code>resource output-stream</code></a></h4>
<p>An output bytestream.</p>
<h2><a href="#output_stream"><code>output-stream</code></a>s are <em>non-blocking</em> to the extent practical on
<p><a href="#output_stream"><code>output-stream</code></a>s are <em>non-blocking</em> to the extent practical on
underlying platforms. Except where specified otherwise, I/O operations also
always return promptly, after the number of bytes that can be written
promptly, which could even be zero. To wait for the stream to be ready to
accept data, the <code>subscribe</code> function to obtain a <a href="#pollable"><code>pollable</code></a> which can be
polled for using <code>wasi:io/poll</code>.</h2>
polled for using <code>wasi:io/poll</code>.</p>
<h4><a name="future_forward_result"><code>resource future-forward-result</code></a></h4>
<p>Represents a future which will eventually return the forward result.</p>
<h2>Dropping this future while it's still pending may trap. Use <code>cancel</code> to
cancel the operation. A future is &quot;pending&quot; while <code>get</code> would return <code>none</code>.</h2>
<h3>Functions</h3>
<h4><a name="method_input_stream.read"><code>[method]input-stream.read: func</code></a></h4>
<p>Perform a non-blocking read from the stream.</p>
Expand Down Expand Up @@ -207,6 +211,81 @@ can be skipped. Except for blocking behavior, identical to <code>skip</code>.</p
<ul>
<li><a name="method_input_stream.blocking_skip.0"></a> result&lt;<code>u64</code>, <a href="#stream_error"><a href="#stream_error"><code>stream-error</code></a></a>&gt;</li>
</ul>
<h4><a name="method_input_stream.forward"><code>[method]input-stream.forward: func</code></a></h4>
<p>Completely drain the input stream into the provided output stream on
a background task. The returned future resolves when either the input
stream has been fully drained or when an error occurred while reading
or writing.</p>
<p>The <code>flush-on-block</code> parameter controls whether the output stream
should be automatically flushed whenever the input stream reports
that it has no data at that moment. When the future resolves it is
possible for there to be data written to the output stream that
hasn't been flushed yet because the last read didn't block.</p>
<p>If you need to be sure that all data has been flushed at the end of
the forward, call <code>flush</code> yourself afterwards or use
<code>forward-and-drop</code> instead.</p>
<p>Even though this function only borrows its parameters, it requires
exclusive access to them for as long as the forward is in progress.
Any attempt to access or drop the streams in the meantime will trap.</p>
<p>This method is equivalent to spawning a background task running the
following pseudo-code:</p>
<pre><code class="language-text">let src-pollable = src.subscribe();
let dst-pollable = dst.subscribe();

loop { // Error &amp; cancellation checking omitted for brevity.
let len = src.splice(dst);
if len == 0 { // No data available at the moment
if flush-on-block {
dst.flush();
}
src-pollable.block();
dst-pollable.block();
}
}
</code></pre>
<h5>Params</h5>
<ul>
<li><a name="method_input_stream.forward.self"><code>self</code></a>: borrow&lt;<a href="#input_stream"><a href="#input_stream"><code>input-stream</code></a></a>&gt;</li>
<li><a name="method_input_stream.forward.dst"><code>dst</code></a>: borrow&lt;<a href="#output_stream"><a href="#output_stream"><code>output-stream</code></a></a>&gt;</li>
<li><a name="method_input_stream.forward.flush_on_block"><code>flush-on-block</code></a>: <code>bool</code></li>
</ul>
<h5>Return values</h5>
<ul>
<li><a name="method_input_stream.forward.0"></a> own&lt;<a href="#future_forward_result"><a href="#future_forward_result"><code>future-forward-result</code></a></a>&gt;</li>
</ul>
<h4><a name="static_input_stream.forward_and_drop"><code>[static]input-stream.forward-and-drop: func</code></a></h4>
<p>Functionally similar to <code>forward</code> except that this function also:</p>
<ul>
<li>automatically performs a final flush, and</li>
<li>drops the stream when it's done.</li>
</ul>
<p>Control over the streams is handed over to the host. This may enable
implementations to perform additional optimizations not possible otherwise.</p>
<p>The streams remain children and their respective parents (if any).
If those parents place any lifetimes restrictions on the streams,
those continue to apply. In practice this typically means that the
returned future should not outlive the stream's parents.
Implementations may trap if the the streams themselves still have
any active child resources (pollables) at the time of calling this
function.</p>
<p>This method is equivalent to spawning a background task running the
following pseudo-code:</p>
<pre><code class="language-text">// Error &amp; cancellation checking omitted for brevity.
src.forward(dst, flush-on-block).subscribe().block();
dst.blocking-flush();
drop(src);
drop(dst);
</code></pre>
<h5>Params</h5>
<ul>
<li><a name="static_input_stream.forward_and_drop.src"><code>src</code></a>: own&lt;<a href="#input_stream"><a href="#input_stream"><code>input-stream</code></a></a>&gt;</li>
<li><a name="static_input_stream.forward_and_drop.dst"><code>dst</code></a>: own&lt;<a href="#output_stream"><a href="#output_stream"><code>output-stream</code></a></a>&gt;</li>
<li><a name="static_input_stream.forward_and_drop.flush_on_block"><code>flush-on-block</code></a>: <code>bool</code></li>
</ul>
<h5>Return values</h5>
<ul>
<li><a name="static_input_stream.forward_and_drop.0"></a> own&lt;<a href="#future_forward_result"><a href="#future_forward_result"><code>future-forward-result</code></a></a>&gt;</li>
</ul>
<h4><a name="method_input_stream.subscribe"><code>[method]input-stream.subscribe: func</code></a></h4>
<p>Create a <a href="#pollable"><code>pollable</code></a> which will resolve once either the specified stream
has bytes available to read or the other end of the stream has been
Expand Down Expand Up @@ -418,3 +497,45 @@ is ready for reading, before performing the <code>splice</code>.</p>
<ul>
<li><a name="method_output_stream.blocking_splice.0"></a> result&lt;<code>u64</code>, <a href="#stream_error"><a href="#stream_error"><code>stream-error</code></a></a>&gt;</li>
</ul>
<h4><a name="method_future_forward_result.subscribe"><code>[method]future-forward-result.subscribe: func</code></a></h4>
<p>Returns a pollable which becomes ready when either the operation has
succeeded, failed or has been canceled. When this pollable is ready,
the <code>get</code> method will return <code>some</code>.</p>
<h5>Params</h5>
<ul>
<li><a name="method_future_forward_result.subscribe.self"><code>self</code></a>: borrow&lt;<a href="#future_forward_result"><a href="#future_forward_result"><code>future-forward-result</code></a></a>&gt;</li>
</ul>
<h5>Return values</h5>
<ul>
<li><a name="method_future_forward_result.subscribe.0"></a> own&lt;<a href="#pollable"><a href="#pollable"><code>pollable</code></a></a>&gt;</li>
</ul>
<h4><a name="method_future_forward_result.cancel"><code>[method]future-forward-result.cancel: func</code></a></h4>
<p>Initiate cancellation of the task. This is an asynchronous operation.
Use <code>subscribe</code> to wait for the cancallation to finish.</p>
<p>Dropping the future while the cancellation is in progress may trap.</p>
<h5>Params</h5>
<ul>
<li><a name="method_future_forward_result.cancel.self"><code>self</code></a>: borrow&lt;<a href="#future_forward_result"><a href="#future_forward_result"><code>future-forward-result</code></a></a>&gt;</li>
</ul>
<h4><a name="method_future_forward_result.get"><code>[method]future-forward-result.get: func</code></a></h4>
<p>Returns the result of the forward operation once the future is ready.</p>
<p>The outer <code>option</code> represents future readiness. Users can wait on this
<code>option</code> to become <code>some</code> using the <code>subscribe</code> method.</p>
<p>The outer <code>result</code> will be <a href="#error"><code>error</code></a> if the future was canceled or the
inner result has already been retrieved from the future in a previous
call.</p>
<p>The inner <code>result</code> represents the result of the actual forward
operation. This will be:</p>
<ul>
<li><code>ok</code> when the source stream was successfully read until the end,</li>
<li><a href="#error"><code>error</code></a> when destination stream was closed before the source stream ended,</li>
<li><a href="#error"><code>error</code></a> when either the source or destination stream returned an error.</li>
</ul>
<h5>Params</h5>
<ul>
<li><a name="method_future_forward_result.get.self"><code>self</code></a>: borrow&lt;<a href="#future_forward_result"><a href="#future_forward_result"><code>future-forward-result</code></a></a>&gt;</li>
</ul>
<h5>Return values</h5>
<ul>
<li><a name="method_future_forward_result.get.0"></a> option&lt;result&lt;result&lt;_, <a href="#stream_error"><a href="#stream_error"><code>stream-error</code></a></a>&gt;&gt;&gt;</li>
</ul>
97 changes: 97 additions & 0 deletions wit/streams.wit
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,70 @@ interface streams {
len: u64,
) -> result<u64, stream-error>;

/// Completely drain the input stream into the provided output stream on
/// a background task. The returned future resolves when either the input
/// stream has been fully drained or when an error occurred while reading
/// or writing.
///
/// The `flush-on-block` parameter controls whether the output stream
/// should be automatically flushed whenever the input stream reports
/// that it has no data at that moment. When the future resolves it is
/// possible for there to be data written to the output stream that
/// hasn't been flushed yet because the last read didn't block.
///
/// If you need to be sure that all data has been flushed at the end of
/// the forward, call `flush` yourself afterwards or use
/// `forward-and-drop` instead.
///
/// Even though this function only borrows its parameters, it requires
/// exclusive access to them for as long as the forward is in progress.
/// Any attempt to access or drop the streams in the meantime will trap.
///
/// This method is equivalent to spawning a background task running the
/// following pseudo-code:
/// ```text
/// let src-pollable = src.subscribe();
/// let dst-pollable = dst.subscribe();
///
/// loop { // Error & cancellation checking omitted for brevity.
/// let len = src.splice(dst);
/// if len == 0 { // No data available at the moment
/// if flush-on-block {
/// dst.flush();
/// }
/// src-pollable.block();
/// dst-pollable.block();
/// }
/// }
/// ```
forward: func(dst: borrow<output-stream>, flush-on-block: bool) -> future-forward-result;

/// Functionally similar to `forward` except that this function also:
/// - automatically performs a final flush, and
/// - drops the stream when it's done.
///
/// Control over the streams is handed over to the host. This may enable
/// implementations to perform additional optimizations not possible otherwise.
///
/// The streams remain children and their respective parents (if any).
/// If those parents place any lifetimes restrictions on the streams,
/// those continue to apply. In practice this typically means that the
/// returned future should not outlive the stream's parents.
/// Implementations may trap if the the streams themselves still have
/// any active child resources (pollables) at the time of calling this
/// function.
///
/// This method is equivalent to spawning a background task running the
/// following pseudo-code:
/// ```text
/// // Error & cancellation checking omitted for brevity.
/// src.forward(dst, flush-on-block).subscribe().block();
/// dst.blocking-flush();
/// drop(src);
/// drop(dst);
/// ```
forward-and-drop: static func(src: input-stream, dst: output-stream, flush-on-block: bool) -> future-forward-result;

/// Create a `pollable` which will resolve once either the specified stream
/// has bytes available to read or the other end of the stream has been
/// closed.
Expand Down Expand Up @@ -259,4 +323,37 @@ interface streams {
len: u64,
) -> result<u64, stream-error>;
}

/// Represents a future which will eventually return the forward result.
///
/// Dropping this future while it's still pending may trap. Use `cancel` to
/// cancel the operation. A future is "pending" while `get` would return `none`.
resource future-forward-result {
/// Returns a pollable which becomes ready when either the operation has
/// succeeded, failed or has been canceled. When this pollable is ready,
/// the `get` method will return `some`.
subscribe: func() -> pollable;

/// Initiate cancellation of the task. This is an asynchronous operation.
/// Use `subscribe` to wait for the cancallation to finish.
///
/// Dropping the future while the cancellation is in progress may trap.
cancel: func();

/// Returns the result of the forward operation once the future is ready.
///
/// The outer `option` represents future readiness. Users can wait on this
/// `option` to become `some` using the `subscribe` method.
///
/// The outer `result` will be `error` if the future was canceled or the
/// inner result has already been retrieved from the future in a previous
/// call.
///
/// The inner `result` represents the result of the actual forward
/// operation. This will be:
/// - `ok` when the source stream was successfully read until the end,
/// - `error` when destination stream was closed before the source stream ended,
/// - `error` when either the source or destination stream returned an error.
get: func() -> option<result<result<_, stream-error>>>;
}
}

0 comments on commit cbc5f40

Please sign in to comment.