Skip to content

Commit

Permalink
Add writer.atomicWrite() method.
Browse files Browse the repository at this point in the history
  • Loading branch information
jan-ivar committed Oct 10, 2023
1 parent bfe8591 commit 672d62d
Showing 1 changed file with 135 additions and 2 deletions.
137 changes: 135 additions & 2 deletions index.bs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ spec:fetch; type:dfn; for:/; text:fetch
spec:url; type:dfn; text:scheme
spec:url; type:dfn; text:fragment
spec:infra; type:dfn; for:/; text:ASCII case-insensitive
spec:infra; type:dfn; text:list
spec:streams; type:dict-member; text:write
</pre>
<pre class="anchors">
url: https://html.spec.whatwg.org/multipage/origin.html#concept-origin; type: dfn; text: origin; for:/
Expand Down Expand Up @@ -1426,6 +1428,7 @@ data to the server.
interface WebTransportSendStream : WritableStream {
attribute long long? sendOrder;
Promise&lt;WebTransportSendStreamStats&gt; getStats();
WebTransportWriter getWriter();
};
</pre>

Expand Down Expand Up @@ -1461,6 +1464,12 @@ The {{WebTransportSendStream}}'s [=transfer steps=] and
1. [=Resolve=] |p| with |stats|.
1. Return |p|.

: <dfn for="WebTransportSendStream" method>getWriter()</dfn>
:: This method must be implemented the same as {{WritableStream/getWriter}}
inherited from {{WritableStream}}, except in place of creating a
{{WritableStreamDefaultWriter}}, it must instead
[=WebTransportWriter/create=] a {{WebTransportWriter}} with [=this=].

## Internal Slots ## {#send-stream-internal-slots}

A {{WebTransportSendStream}} has the following internal slots.
Expand Down Expand Up @@ -1490,6 +1499,11 @@ A {{WebTransportSendStream}} has the following internal slots.
<td><dfn>`[[SendOrder]]`</dfn>
<td class="non-normative">An optional send order number, or null.
</tr>
<tr>
<td><dfn>`[[AtomicWriteRequests]]`</dfn>
<td class="non-normative">A [=list=] of promises, each representing an in-flight atomic
write request to be processed by the underlying sink.
</tr>
<tbody>
</table>

Expand All @@ -1510,6 +1524,8 @@ To <dfn export for="WebTransportSendStream" lt="create|creating">create</dfn> a
:: |transport|
: {{WebTransportSendStream/[[SendOrder]]}}
:: |sendOrder|
: {{WebTransportSendStream/[[AtomicWriteRequests]]}}
:: An empty [=list=] of promises.
1. Let |writeAlgorithm| be an action that [=writes=] |chunk| to |stream|, given |chunk|.
1. Let |closeAlgorithm| be an action that [=closes=] |stream|.
1. Let |abortAlgorithm| be an action that [=aborts=] |stream| with |reason|, given |reason|.
Expand Down Expand Up @@ -1537,6 +1553,14 @@ To <dfn for="WebTransportSendStream">write</dfn> |chunk| to a {{WebTransportSend
1. Let |promise| be a new promise.
1. Let |bytes| be a copy of the [=byte sequence=] which |chunk| represents.
1. Set |stream|.{{[[PendingOperation]]}} to |promise|.
1. Let |inFlightWriteRequest| be
|stream|.<a href="https://streams.spec.whatwg.org/#writablestream-inflightwriterequest">inFlightWriteRequest</a>.

Note: Find a [better way](https://streams.spec.whatwg.org/#other-specs) to integrate
with the streams spec to identify the write request in flight.

1. Let |atomic| be true if |inFlightWriteRequest| [=list/exists=]
in [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}, otherwise false.
1. Run the following steps [=in parallel=]:
1. [=stream/Send=] |bytes| on |stream|.{{WebTransportSendStream/[[InternalStream]]}} and wait for the
operation to complete.
Expand All @@ -1552,6 +1576,12 @@ To <dfn for="WebTransportSendStream">write</dfn> |chunk| to a {{WebTransportSend
[=WritableStream/Error | errored=] nor blocked by [=flow control=], have been
sent.

Whenever the sending of |bytes| becomes blocked by [=flow control=],
[=queue a network task=] with |transport| to [=abort all atomic write requests=] on |stream|.

If the sending of |bytes| becomes blocked by [=flow control=] and |atomic| is true,
then give up on sending |bytes| and proceed immediately to the next step without failing.

The user agent SHOULD divide bandwidth fairly between all streams that aren't starved.

Note: The definition of fairness here is [=implementation-defined=].
Expand All @@ -1567,13 +1597,15 @@ To <dfn for="WebTransportSendStream">write</dfn> |chunk| to a {{WebTransportSend

1. [=Queue a network task=] with |transport| to run these steps:
1. Set |stream|.{{[[PendingOperation]]}} to null.
1. If |atomic| is true, [=map/remove=] |inFlightWriteRequest| from
|stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}}.
1. [=Resolve=] |promise| with undefined.
1. Return |promise|.

Note: The user-agent MAY have a buffer to improve the transfer performance. Such a buffer
SHOULD have a fixed upper limit, to carry the backpressure information to the user of
{{WebTransportSendStream}}. This also means the [=fulfilled|fulfillment=] of the promise returned from this algorithm (or,
{{WritableStreamDefaultWriter/write|WritableStreamDefaultWriter.write}}) does **NOT** necessarily mean that the chunk is acked by
{{WritableStreamDefaultWriter/write(chunk)}}) does **NOT** necessarily mean that the chunk is acked by
the server [[!QUIC]]. It may just mean that the chunk is appended to the buffer. To make sure that
the chunk arrives at the server, use an application-level protocol.

Expand Down Expand Up @@ -1619,6 +1651,16 @@ To <dfn for="WebTransportSendStream">abort</dfn> a {{WebTransportSendStream}} |s

</div>

<div algorithm>
To <dfn for="WebTransportSendStream">abort all atomic write requests</dfn> on a {{WebTransportSendStream}} |stream|, run these steps:
1. Let |requestsToAbort| be [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}.
1. [=map/Clear=] [=stream=].{{WebTransportSendStream/[[AtomicWriteRequests]]}}.
1. [=For each=] |promise| in |requestsToAbort|, [=reject=] |promise| with {{TransactionInactiveError}}.
1. [=In parallel=], [=for each=] |promise| in |requestsToAbort|, abort the sending of data
associated with |promise|.

</div>

## STOP_SENDING signal coming from the server ## {#send-stream-STOP_SENDING}

<div algorithm="STOP_SENDING signal">
Expand Down Expand Up @@ -1970,6 +2012,72 @@ object |transport|, and a |sendOrder|, run these steps.

</div>

# `WebTransportWriter` Interface # {#web-transport-writer-procedures-interface}

{{WebTransportWriter}} is a subclass of {{WritableStreamDefaultWriter}} that
overloads one method and adds another.

A {{WebTransportWriter}} is always created by the
[=WebTransportWriter/create=] procedure.

<pre class="idl">
[Exposed=*, SecureContext]
interface WebTransportWriter : WritableStreamDefaultWriter {
Promise&lt;undefined&gt; write(optional any chunk);
Promise&lt;undefined&gt; atomicWrite(optional any chunk);
};
</pre>

## Methods ## {#web-transport-writer-procedures-methods}

: <dfn for="WebTransportWriter" method>write(chunk)</dfn>
:: The {{write}} method will not reject on [=flow control=] blockage
(unless queued behind one or more outstanding calls to {{atomicWrite}}).
This behavior is designed to satisfy most applications.

When called, run the following steps:
1. Let |stream| be the {{WebTransportSendStream}} associated with [=this=].
1. If |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}} is not empty,
return the result of [=writing atomically=] on |stream| with |chunk|.
1. Return the result of {{WritableStreamDefaultWriter/write(chunk)}}
on {{WritableStreamDefaultWriter}} with |chunk|.

: <dfn for="WebTransportWriter" method>atomicWrite(chunk)</dfn>
:: The {{atomicWrite}} method will reject if the chunk given to it
cannot be sent in its entirety without blocking on [=flow control=].
This behavior is designed to satisfy niche transactional applications
sensitive to [=flow control=] deadlocks ([[RFC9308]]
[Section 4.4](https://datatracker.ietf.org/doc/html/rfc9308#section-4.4)).

When called, return the result of [=writing atomically=] on the
{{WebTransportSendStream}} associated with [=this=], with |chunk|.

## Procedures ## {#web-transport-writer-procedures-procedures}

<div algorithm="create a writer">

To <dfn export for="WebTransportWriter" lt="create|creating">create</dfn> a
{{WebTransportWriter}}, with a {{WebTransportSendStream}} |stream|, run these
steps:
1. Let |writer| be a [=new=] {{WebTransportWriter}}.
1. Run the [new WritableStreamDefaultWriter(stream)](https://streams.spec.whatwg.org/#default-writer-constructor)
constructor steps passing |writer| as this, and |stream| as the constructor argument.
1. Return |writer|.

</div>

<div algorithm="write atomically">

To <dfn export for="WebTransportWriter" lt="write atomically">write atomically</dfn>
on a {{WebTransportSendStream}} |stream|, given |chunk|, run these steps:
1. Let |p| be the result of {{WritableStreamDefaultWriter/write(chunk)}}
on {{WritableStreamDefaultWriter}} with |chunk|.
1. Set |stream|.{{WebTransportSendStream/[[AtomicWriteRequests]]}} to |p|.
1. Return |p|.

</div>


# `WebTransportError` Interface # {#web-transport-error-interface}

<dfn interface>WebTransportError</dfn> is a subclass of {{DOMException}} that represents
Expand Down Expand Up @@ -2093,7 +2201,7 @@ converted to an httpErrorCode, and vice versa, as specified in [[!WEB-TRANSPORT-
<td>[=stream/Send|sends=] STREAM with FIN bit set</td>
</tr>
<tr>
<td>{{WebTransportBidirectionalStream/writable}}.getWriter().{{WritableStreamDefaultWriter/write}}()</td>
<td>{{WebTransportBidirectionalStream/writable}}.getWriter().{{WritableStreamDefaultWriter/write(chunk)}}()</td>
<td>[=stream/Send|sends=] STREAM</td>
</tr>
<tr>
Expand Down Expand Up @@ -2535,6 +2643,31 @@ async function receiveText(url, createWritableStreamForTextData) {
}
</pre>

## Sending a transactional chunk on a stream ## {#example-transactional-stream}

*This section is non-normative.*

Sending a transactional piece of data on a one-way stream only if it can be done
entirely without blocking on [=flow control=], can be achieved by using the
{{WebTransportSendStream/getWriter}} function and the resulting writer.

<pre class="example" highlight="js">
async function sendTransactionalData(wt, bytes) {
const writable = await wt.createUnidirectionalStream();
const writer = writable.getWriter();
await writer.ready;
try {
await writer.atomicWrite(bytes);
} catch (e) {
if (e.name != "TransactionInactiveError") throw e;
// rejected to avoid blocking on flow control
// The writable remains un-errored unlike with regular writes
} finally {
writer.releaseLock();
}
}
</pre>

## Complete example ## {#example-complete}

*This section is non-normative.*
Expand Down

0 comments on commit 672d62d

Please sign in to comment.