Skip to content

Commit 3c057e8

Browse files
committed
Define cancellation of sync {stream,future}.{read,write} to trap
1 parent 2b375c8 commit 3c057e8

File tree

2 files changed

+32
-19
lines changed

2 files changed

+32
-19
lines changed

design/mvp/CanonicalABI.md

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1507,8 +1507,9 @@ out into the `CopyEnd` class that is derived below:
15071507
```python
15081508
class CopyState(Enum):
15091509
IDLE = 1
1510-
COPYING = 2
1511-
DONE = 3
1510+
SYNC_COPYING = 2
1511+
ASYNC_COPYING = 3
1512+
DONE = 4
15121513

15131514
class CopyEnd(Waitable):
15141515
state: CopyState
@@ -1517,14 +1518,19 @@ class CopyEnd(Waitable):
15171518
Waitable.__init__(self)
15181519
self.state = CopyState.IDLE
15191520

1521+
def copying(self):
1522+
return self.state == CopyState.SYNC_COPYING or self.state == CopyState.ASYNC_COPYING
1523+
15201524
def drop(self):
1521-
trap_if(self.state == CopyState.COPYING)
1525+
trap_if(self.copying())
15221526
Waitable.drop(self)
15231527
```
15241528
As shown in `drop`, attempting to drop a readable or writable end while a copy
15251529
is in progress traps. This means that client code must take care to wait for
15261530
these operations to finish (potentially cancelling them via
1527-
`stream.cancel-{read,write}`) before dropping.
1531+
`stream.cancel-{read,write}`) before dropping. The `SYNC_COPY` vs. `ASYNC_COPY`
1532+
distinction is tracked in the state to determine whether the copy operation can
1533+
be cancelled.
15281534

15291535
Given the above, we can define the concrete `{Readable,Writable}StreamEnd`
15301536
classes which are almost entirely symmetric, with the only difference being
@@ -4083,7 +4089,6 @@ until this point into a single `i32` payload for core wasm.
40834089
```python
40844090
def stream_event(result, reclaim_buffer):
40854091
reclaim_buffer()
4086-
assert(e.state == CopyState.COPYING)
40874092
if result == CopyResult.DROPPED:
40884093
e.state = CopyState.DONE
40894094
else:
@@ -4099,7 +4104,6 @@ until this point into a single `i32` payload for core wasm.
40994104
def on_copy_done(result):
41004105
e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:()))
41014106

4102-
e.state = CopyState.COPYING
41034107
e.copy(thread.task.inst, buffer, on_copy, on_copy_done)
41044108
```
41054109

@@ -4111,8 +4115,10 @@ synchronously and return `BLOCKED` if not:
41114115
```python
41124116
if not e.has_pending_event():
41134117
if opts.sync:
4118+
e.state = CopyState.SYNC_COPYING
41144119
thread.suspend_until(e.has_pending_event)
41154120
else:
4121+
e.state = CopyState.ASYNC_COPYING
41164122
return [BLOCKED]
41174123
code,index,payload = e.get_pending_event()
41184124
assert(code == event_code and index == i and payload != BLOCKED)
@@ -4177,7 +4183,6 @@ of elements copied is not packed in the high 28 bits; they're always zero.
41774183
```python
41784184
def future_event(result):
41794185
assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED))
4180-
assert(e.state == CopyState.COPYING)
41814186
if result == CopyResult.DROPPED or result == CopyResult.COMPLETED:
41824187
e.state = CopyState.DONE
41834188
else:
@@ -4188,7 +4193,6 @@ of elements copied is not packed in the high 28 bits; they're always zero.
41884193
assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE)
41894194
e.set_pending_event(partial(future_event, result))
41904195

4191-
e.state = CopyState.COPYING
41924196
e.copy(thread.task.inst, buffer, on_copy_done)
41934197
```
41944198

@@ -4197,8 +4201,10 @@ and returning either the progress made or `BLOCKED`.
41974201
```python
41984202
if not e.has_pending_event():
41994203
if opts.sync:
4204+
e.state = CopyState.SYNC_COPYING
42004205
thread.suspend_until(e.has_pending_event)
42014206
else:
4207+
e.state = CopyState.ASYNC_COPYING
42024208
return [BLOCKED]
42034209
code,index,payload = e.get_pending_event()
42044210
assert(code == event_code and index == i)
@@ -4240,7 +4246,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, thread, i):
42404246
e = thread.task.inst.table.get(i)
42414247
trap_if(not isinstance(e, EndT))
42424248
trap_if(e.shared.t != stream_or_future_t.t)
4243-
trap_if(e.state != CopyState.COPYING)
4249+
trap_if(e.state != CopyState.ASYNC_COPYING)
42444250
if not e.has_pending_event():
42454251
e.shared.cancel()
42464252
if not e.has_pending_event():
@@ -4249,9 +4255,12 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, thread, i):
42494255
else:
42504256
return [BLOCKED]
42514257
code,index,payload = e.get_pending_event()
4252-
assert(e.state != CopyState.COPYING and code == event_code and index == i)
4258+
assert(not e.copying() and code == event_code and index == i)
42534259
return [payload]
42544260
```
4261+
Cancellation traps if there is not currently an async copy in progress (sync
4262+
copies do not expect or check for cancellation and thus cannot be cancelled).
4263+
42554264
The *first* check for `e.has_pending_event()` catches the case where the copy has
42564265
already racily finished, in which case we must *not* call `cancel()`. Calling
42574266
`cancel()` may, but is not required to, recursively call one of the `on_*`

design/mvp/canonical-abi/definitions.py

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -871,8 +871,9 @@ def write(self, inst, src_buffer, on_copy, on_copy_done):
871871

872872
class CopyState(Enum):
873873
IDLE = 1
874-
COPYING = 2
875-
DONE = 3
874+
SYNC_COPYING = 2
875+
ASYNC_COPYING = 3
876+
DONE = 4
876877

877878
class CopyEnd(Waitable):
878879
state: CopyState
@@ -881,8 +882,11 @@ def __init__(self):
881882
Waitable.__init__(self)
882883
self.state = CopyState.IDLE
883884

885+
def copying(self):
886+
return self.state == CopyState.SYNC_COPYING or self.state == CopyState.ASYNC_COPYING
887+
884888
def drop(self):
885-
trap_if(self.state == CopyState.COPYING)
889+
trap_if(self.copying())
886890
Waitable.drop(self)
887891

888892
class ReadableStreamEnd(CopyEnd):
@@ -2344,7 +2348,6 @@ def stream_copy(EndT, BufferT, event_code, stream_t, opts, thread, i, ptr, n):
23442348

23452349
def stream_event(result, reclaim_buffer):
23462350
reclaim_buffer()
2347-
assert(e.state == CopyState.COPYING)
23482351
if result == CopyResult.DROPPED:
23492352
e.state = CopyState.DONE
23502353
else:
@@ -2360,13 +2363,14 @@ def on_copy(reclaim_buffer):
23602363
def on_copy_done(result):
23612364
e.set_pending_event(partial(stream_event, result, reclaim_buffer = lambda:()))
23622365

2363-
e.state = CopyState.COPYING
23642366
e.copy(thread.task.inst, buffer, on_copy, on_copy_done)
23652367

23662368
if not e.has_pending_event():
23672369
if opts.sync:
2370+
e.state = CopyState.SYNC_COPYING
23682371
thread.suspend_until(e.has_pending_event)
23692372
else:
2373+
e.state = CopyState.ASYNC_COPYING
23702374
return [BLOCKED]
23712375
code,index,payload = e.get_pending_event()
23722376
assert(code == event_code and index == i and payload != BLOCKED)
@@ -2395,7 +2399,6 @@ def future_copy(EndT, BufferT, event_code, future_t, opts, thread, i, ptr):
23952399

23962400
def future_event(result):
23972401
assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED))
2398-
assert(e.state == CopyState.COPYING)
23992402
if result == CopyResult.DROPPED or result == CopyResult.COMPLETED:
24002403
e.state = CopyState.DONE
24012404
else:
@@ -2406,13 +2409,14 @@ def on_copy_done(result):
24062409
assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE)
24072410
e.set_pending_event(partial(future_event, result))
24082411

2409-
e.state = CopyState.COPYING
24102412
e.copy(thread.task.inst, buffer, on_copy_done)
24112413

24122414
if not e.has_pending_event():
24132415
if opts.sync:
2416+
e.state = CopyState.SYNC_COPYING
24142417
thread.suspend_until(e.has_pending_event)
24152418
else:
2419+
e.state = CopyState.ASYNC_COPYING
24162420
return [BLOCKED]
24172421
code,index,payload = e.get_pending_event()
24182422
assert(code == event_code and index == i)
@@ -2437,7 +2441,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, thread, i):
24372441
e = thread.task.inst.table.get(i)
24382442
trap_if(not isinstance(e, EndT))
24392443
trap_if(e.shared.t != stream_or_future_t.t)
2440-
trap_if(e.state != CopyState.COPYING)
2444+
trap_if(e.state != CopyState.ASYNC_COPYING)
24412445
if not e.has_pending_event():
24422446
e.shared.cancel()
24432447
if not e.has_pending_event():
@@ -2446,7 +2450,7 @@ def cancel_copy(EndT, event_code, stream_or_future_t, sync, thread, i):
24462450
else:
24472451
return [BLOCKED]
24482452
code,index,payload = e.get_pending_event()
2449-
assert(e.state != CopyState.COPYING and code == event_code and index == i)
2453+
assert(not e.copying() and code == event_code and index == i)
24502454
return [payload]
24512455

24522456
### 🔀 `canon {stream,future}.drop-{readable,writable}`

0 commit comments

Comments
 (0)