From 824fdc5bab31f336f3a9da82d6220427acfcbdc8 Mon Sep 17 00:00:00 2001 From: Luke Wagner Date: Mon, 16 Sep 2024 14:04:54 -0500 Subject: [PATCH] Refactor event delivery to make non-determinism more clear Rename CallContext to Context --- design/mvp/CanonicalABI.md | 292 ++++++++++++++---------- design/mvp/canonical-abi/definitions.py | 140 +++++++----- design/mvp/canonical-abi/run_tests.py | 31 ++- 3 files changed, 265 insertions(+), 198 deletions(-) diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index d9f2f317..613a430b 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -11,7 +11,7 @@ being specified here. * [Despecialization](#despecialization) * [Alignment](#alignment) * [Element Size](#element-size) - * [Call Context](#call-context) + * [Context](#context) * [Canonical ABI Options](#canonical-abi-options) * [Runtime State](#runtime-state) * [Loading](#loading) @@ -223,23 +223,34 @@ def elem_size_flags(labels): return 4 ``` -### Call Context +### Context -The subsequent definitions of loading and storing a value from linear memory -require additional configuration and state, which is threaded through most -subsequent definitions via the `cx` parameter of type `CallContext`: +The subsequent definitions of lifting and lowering depend on three kinds of +ambient information: +* static ABI options supplied via [`canonopt`] +* dynamic state in the containing component instance +* dynamic state in the [current task] + +These sources of ambient context are stored as the respective `opts`, `inst` +and `task` fields of the `Context` object: ```python -@dataclass -class CallContext: +class Context: opts: CanonicalOptions inst: ComponentInstance + task: Task + + def __init__(self, opts, inst, task): + self.opts = opts + self.inst = inst + self.task = task ``` -Note that the `Task` and `Subtask` classes defined below derive `CallContext`, -adding additional state only used for export and import calls, resp. +The `cx` parameter in functions below refers to the ambient `Context`. The +`Task` and `Subtask` classes derive `Context` and thus having a `task` or +`subtask` also establishes the ambient `Context`. ### Canonical ABI Options -The `opts` field of `CallContext` contains all the possible `canonopt` +The `opts` field of `CallContext` contains all the possible [`canonopt`] immediates that can be passed to the `canon` definition being implemented. ```python @dataclass @@ -266,21 +277,21 @@ class ComponentInstance: handles: HandleTables async_subtasks: Table[AsyncSubtask] may_leave: bool - interruptible: asyncio.Event backpressure: bool - sync_call: bool + calling_sync_export: bool num_tasks: int + interruptible: asyncio.Event pending_tasks: list[tuple[Task, asyncio.Future]] def __init__(self): self.handles = HandleTables() self.async_subtasks = Table[AsyncSubtask]() self.may_leave = True - self.interruptible = asyncio.Event() - self.interruptible.set() self.backpressure = False - self.sync_call = False + self.calling_sync_export = False self.num_tasks = 0 + self.interruptible = asyncio.Event() + self.interruptible.set() self.pending_tasks = [] ``` @@ -429,7 +440,7 @@ created by `canon_lift` and `Subtask`, which is created by `canon_lower`. Additional sync-/async-specialized mutable state is added by the `AsyncTask` and `AsyncSubtask` subclasses. -The `Task` class and its subclasses depend on the following three enums: +The `Task` class and its subclasses depend on the following type definitions: ```python class AsyncCallState(IntEnum): STARTING = 0 @@ -444,6 +455,8 @@ class EventCode(IntEnum): CALL_DONE = AsyncCallState.DONE YIELDED = 4 +Event = Callable[[], tuple[EventCode, int]] + class OnBlockResult(IntEnum): BLOCKED = 0 COMPLETED = 1 @@ -453,8 +466,6 @@ call necessarily transitions through: [`STARTING`](Async.md#starting), `STARTED`, [`RETURNING`](Async.md#returning) and `DONE`. The `EventCode` enum shares common code values with `AsyncCallState` to define the set of integer event codes that are delivered to [waiting](Async.md#waiting) or polling tasks. -The `OnBlockResult` enum conveys the two possible results of the `on_block` -future used to tell callers whether or not the callee blocked. The `current_Task` global holds an `asyncio.Lock` that is used to prevent the Python runtime from arbitrarily switching between Python coroutines (`async @@ -472,21 +483,25 @@ current_task = asyncio.Lock() A `Task` object is created for each call to `canon_lift` and is implicitly threaded through all core function calls. This implicit `Task` parameter -represents "[the current task](Async.md#current-task)". +represents the "[current task]". A `Task` is-a `Context`, with its `opts` +corresponding to the `canonopt` supplied to the `canon lift` definition that +the `Task` was created for. ```python -class Task(CallContext): +class Task(Context): caller: Optional[Task] - on_block: Optional[asyncio.Future[OnBlockResult]] + on_block: asyncio.Future[OnBlockResult] borrow_count: int - events: asyncio.Queue[AsyncSubtask] + events: list[Event] + has_events: asyncio.Event num_async_subtasks: int def __init__(self, opts, inst, caller, on_block): - super().__init__(opts, inst) + super().__init__(opts, inst, self) self.caller = caller self.on_block = on_block self.borrow_count = 0 - self.events = asyncio.Queue[AsyncSubtask]() + self.events = [] + self.has_events = asyncio.Event() self.num_async_subtasks = 0 ``` The fields of `Task` are introduced in groups of related `Task` methods next. @@ -524,10 +539,10 @@ with `may_enter` and `may_start_pending_task`, implements backpressure. If a (via `suspend`, shown next) and goes into a `pending_tasks` queue, waiting to be unblocked by another task calling `maybe_start_pending_task`. One key property of this backpressure scheme is that `pending_tasks` are only dequeued -one at a time, ensuring that if an overloaded component instance enables -backpressure (via `task.backpressure`) and then disables it, there will not be -an unstoppable thundering herd of pending tasks started all at once that OOM -the component before it can re-enable backpressure. +one at a time, ensuring that if an overloaded component instance enables and +then disables backpressure, there will not be an unstoppable thundering herd of +pending tasks started all at once that OOM the component before it can +re-enable backpressure. ```python async def enter(self): assert(current_task.locked()) @@ -536,15 +551,15 @@ the component before it can re-enable backpressure. f = asyncio.Future() self.inst.pending_tasks.append((self, f)) await self.suspend(f) - self.inst.num_tasks += 1 if self.opts.sync: - self.inst.sync_call = True + self.inst.calling_sync_export = True + self.inst.num_tasks += 1 def may_enter(self, pending_task): - return self.inst.interruptible.is_set() and \ - not self.inst.backpressure and \ - not self.inst.sync_call and \ - not (pending_task.opts.sync and self.inst.num_tasks > 0) + return not self.inst.backpressure and \ + not self.inst.calling_sync_export and \ + not (pending_task.opts.sync and self.inst.num_tasks > 0) and \ + self.inst.interruptible.is_set() def maybe_start_pending_task(self): if self.inst.pending_tasks: @@ -553,21 +568,30 @@ the component before it can re-enable backpressure. self.inst.pending_tasks.pop(0) future.set_result(None) ``` -The rules shown above also ensure that synchronously-lifted exports only -execute when no other (sync or async) tasks are executing concurrently. - -The `suspend` method, called by `enter`, `wait` and `yield_`, takes a future -to `await` and allows other tasks to make progress in the meantime. Since the -`current_task` lock is shared by all linked component instances, releasing -then acquiring it allows switching to any task in the same or different -component instance. The final loop ensures that, when a component instance -makes a `sync`-lowered import call, which is signalled by clearing -`interruptible` for the duration of the call, no other tasks *in the same -component instance* can execute (which would otherwise break the appearance of -a *synchronous* call from the perspective of that component instance). +Considering the 4 conditions in `may_enter`: +* The `backpressure` check reflects explicit backpressure triggered by + core wasm code (via `canon task.backpressure`, defined below). +* The next two conditions ensure that synchronous exports do not interleave + execution with any other task in the same component instance. The first + condition prevents async calls from starting while there is a sync call in + flight, while the second condition prevents sync calls from starting while + there are *any* other calls in flight. +* The `interruptible` check is automatically set and cleared before and after + any synchronous `canon` built-in called by core wasm, temporarily preventing + any interleaved execution during synchronous call. + +The `suspend` method, called by `enter`, `wait` and `yield_`, takes a future to +`await` and allows other tasks to make progress in the meantime. Since the +`current_task` lock is shared by all linked component instances, releasing then +acquiring it allows switching to any task in the same or different component +instance. The final loop in `suspend` waits until `interruptible` is set to +ensure that tasks that have been suspended cannot resume execution. Together +with the guard `may_enter`, this ensures that clearing `interruptible` ensures +traditional synchronous execution (for use in synchronous `canon` definitions +below). ```python async def suspend(self, future): - if self.on_block and not self.on_block.done(): + if not self.on_block.done(): self.on_block.set_result(OnBlockResult.BLOCKED) else: current_task.release() @@ -591,30 +615,62 @@ adapters to Core WebAssembly using `suspend` and `resume`. While a task is running, it may call `wait` (via `canon task.wait` or, when a `callback` is present, by returning to the event loop) to block until there is -progress on one of the task's async subtasks. Alternatively, the current task -can call `poll` (via `canon task.poll`), which does not block and does not -allow the runtime to switch to another task. Lastly, a task may cooperatively -yield (via `canon task.yield`), allowing the switching but without waiting for -any external events (as emulated in the Python code by awaiting `sleep(0)`. +an event that will allow the `Task` to make progress. If there are no +currently-pending events in the `events` list, `wait` suspends until the +`has_events` `asyncio.Event` is set by `notify`. Once there is at least one +event in the list, an event is popped non-deterministically to provide the +runtime flexibility in event queueing and delivery. ```python async def wait(self): self.maybe_start_pending_task() - subtask = await self.suspend(self.events.get()) - return self.process_event(subtask) + if not self.events: + await self.suspend(self.has_events.wait()) + return self.next_event() + def next_event(self): + assert(self.events) + if DETERMINISTIC_PROFILE: + i = 0 + else: + i = random.randrange(len(self.events)) + event = self.events.pop(i) + if not self.events: + self.has_events.clear() + return event() + + def notify(self, event): + self.events.append(event) + self.has_events.set() +``` +Note that events are represented as *first-class functions* that are called by +`next_event` to produce the tuple of scalar values that are actually delivered +to core wasm. This allows an event source to report the latest status when the +event is handed to the core wasm code instead of the status when the event was +first generated. This allows multiple redundant events to be collapsed into +one, reducing overhead. + +Although this Python code represents events as a list of closures, an +optimizing implementation should be able to avoid actually allocating these +things and instead embed a linked list of "ready" events into the table +elements associated with the events. + +In addition to `wait`, the current task can also use `poll` (via `canon task.poll`), +to not block and not allow the runtime to switch to another task: +```python def poll(self): - if self.events.empty(): - return None - return self.process_event(self.events.get_nowait()) + if self.events: + return self.next_event() + return None +``` +Lastly, a task may cooperatively yield (via `canon task.yield`), allowing the +runtime to switch execution to another task without having to wait for any +external I/O (as emulated in the Python code by awaiting `sleep(0)`: +```python async def yield_(self): self.maybe_start_pending_task() await self.suspend(asyncio.sleep(0)) ``` -Although this Python code uses an `asyncio.Queue` to coordinate async events, -an optimized implementation should not have to create an actual queue; instead -it should be possible to embed a "next ready" linked list in the elements of -the `async_subtasks` table. All `Task`s (whether lifted `async` or not) are allowed to call `async`-lowered imports. Calling an `async`-lowered import creates an `AsyncSubtask` (defined @@ -623,29 +679,13 @@ table and tracked by the current task's `num_async_subtasks` counter, which is guarded to be `0` in `Task.exit` (below) to ensure [structured concurrency]. ```python def add_async_subtask(self, subtask): - assert(subtask.supertask is None and subtask.index is None) - subtask.supertask = self - subtask.index = self.inst.async_subtasks.add(subtask) + assert(subtask.task is self and not subtask.notify_supertask) + subtask.notify_supertask = True self.num_async_subtasks += 1 - return subtask.index - - def async_subtask_made_progress(self, subtask): - assert(subtask.supertask is self) - if subtask.enqueued: - return - subtask.enqueued = True - self.events.put_nowait(subtask) - - def process_event(self, subtask): - assert(subtask.supertask is self) - subtask.enqueued = False - return (EventCode(subtask.state), subtask.index) -``` -Instead of simply enqueuing an `EventCode` when `async_subtask_made_progress`, -the above code achieves the effect of collapsing multiple events into one, -delivering only the newest state, by enqueing the mutable `AsyncSubtask` itself -at most once and only reading its `state` when the event is being *popped* from -the queue. + return self.inst.async_subtasks.add(subtask) +``` +The `notify_supertask` flag signals to `AsyncSubtask` methods (defined below) +to notify this `Task` when the lowered callee makes progress. The `borrow_count` field is used by the following methods to track the number of borrowed handles that were passed as parameters to the export that have not @@ -665,23 +705,27 @@ above and allows pending tasks to start. ```python def exit(self): assert(current_task.locked()) - assert(self.events.empty()) + assert(not self.events) assert(self.inst.num_tasks >= 1) trap_if(self.borrow_count != 0) trap_if(self.num_async_subtasks != 0) self.inst.num_tasks -= 1 if self.opts.sync: - self.inst.sync_call = False + self.inst.calling_sync_export = False self.maybe_start_pending_task() ``` -While `canon_lift` creates `Task`s, `canon_lower` creates `Subtask` objects: +While `canon_lift` creates `Task`s, `canon_lower` creates `Subtask` objects. +Like `Task`, `Subtask` is a subclass of `Context`, gets its `opts` from the +`canon lower`. Importantly, the `Context.task` field refers to the [current +task] at the point of the `canon lower` call, thereby linking a subtask to its +supertask. ```python -class Subtask(CallContext): +class Subtask(Context): lenders: list[HandleElem] - def __init__(self, opts, inst): - super().__init__(opts, inst) + def __init__(self, opts, task): + super().__init__(opts, task.inst, task) self.lenders = [] def track_owning_lend(self, lending_handle): @@ -740,49 +784,50 @@ class AsyncTask(Task): Finally, the `AsyncSubtask` class extends `Subtask` with fields that are used by the methods of `Task`, as shown above. `AsyncSubtask`s have the same linear state machine as `AsyncTask`s, except that the state transitions are guaranteed -by the Canonical ABI to happen in the right order. Each time an async subtask -advances a state, it notifies its "supertask", which was the current task when -the async-lowered function was first called. +by the Canonical ABI to happen in the right order, as asserted by the methods +below. ```python class AsyncSubtask(Subtask): state: AsyncCallState - supertask: Optional[Task] - index: Optional[int] + notify_supertask: bool enqueued: bool - def __init__(self, opts, inst): - super().__init__(opts, inst) + def __init__(self, opts, task): + super().__init__(opts, task) self.state = AsyncCallState.STARTING - self.supertask = None - self.index = None + self.notify_supertask = False self.enqueued = False + def maybe_notify_supertask(self): + if self.notify_supertask and not self.enqueued: + self.enqueued = True + def event(): + self.enqueued = False + return (EventCode(self.state), self.inst.async_subtasks.array.index(self)) + self.task.notify(event) + def start(self): assert(self.state == AsyncCallState.STARTING) self.state = AsyncCallState.STARTED - if self.supertask is not None: - self.supertask.async_subtask_made_progress(self) + self.maybe_notify_supertask() def return_(self): assert(self.state == AsyncCallState.STARTED) self.state = AsyncCallState.RETURNED - if self.supertask is not None: - self.supertask.async_subtask_made_progress(self) + self.maybe_notify_supertask() def finish(self): super().finish() assert(self.state == AsyncCallState.RETURNED) self.state = AsyncCallState.DONE - if self.supertask is not None: - self.supertask.async_subtask_made_progress(self) + self.maybe_notify_supertask() ``` -The `supertask` and `index` fields will be `None` when a subtask first starts -executing, before it blocks and gets added to the `async_subtasks` table (by -`canon_lower`, below). If a subtask advances all the way to the `DONE` state -before blocking, the `async`-lowered call will indicate to the caller that the -callee completed synchronously, avoiding the overhead of adding an -`AsyncSubtask` altogether. Thus, progress events don't need to be delivered -until the subtask has passed this "possibly synchronous early return" phase. +The `maybe_notify_supertask` method only sends events to the supertask if this +`AsyncSubtask` actually blocked and got added to the `async_subtasks` table +(as signalled by `notify_supertask`). Additionally, `maybe_notify_supertask` +uses the `enqueued` flag and the fact that "events" are first-class functions +to collapse N events down to 1 if a subtask advances state multiple times +before the supertask receives the event. ### Loading @@ -1887,8 +1932,8 @@ When instantiating component instance `$inst`: The resulting function `$f` takes 4 runtime arguments: * `caller`: the caller's `Task` or, if this lifted function is being called by the host, `None` -* `on_block`: an optional `asyncio.Future` that must be resolved with - `OnBlockResult.BLOCKED` if the callee blocks on I/O +* `on_block`: a possibly-already-resolved `asyncio.Future` that the callee must + resolve (if not already) with `OnBlockResult.BLOCKED` if the callee blocks * `on_start`: a nullary function that must be called to return the caller's arguments as a list of component-level values * `on_return`: a unary function that must be called after `on_start`, @@ -2008,18 +2053,18 @@ async def canon_lower(opts, callee, ft, task, flat_args): flat_args = CoreValueIter(flat_args) flat_results = None if opts.sync: - subtask = Subtask(opts, task.inst) - task.inst.interruptible.clear() + subtask = Subtask(opts, task) def on_start(): return lift_flat_values(subtask, MAX_FLAT_PARAMS, flat_args, ft.param_types()) def on_return(results): nonlocal flat_results flat_results = lower_flat_values(subtask, MAX_FLAT_RESULTS, results, ft.result_types(), flat_args) + task.inst.interruptible.clear() await callee(task, task.on_block, on_start, on_return) task.inst.interruptible.set() subtask.finish() else: - subtask = AsyncSubtask(opts, task.inst) + subtask = AsyncSubtask(opts, task) on_block = asyncio.Future() async def do_call(): def on_start(): @@ -2045,12 +2090,11 @@ async def canon_lower(opts, callee, ft, task, flat_args): assert(current_task.locked()) return flat_results ``` -In the synchronous case, the import call is bracketed by clearing -`interruptible` for the duration of the call. Clearing `interrupt` both -prevents new tasks from starting (via `Task.may_enter`) and prevents suspended -tasks from resuming execution (via `Task.suspend`). Propagating the caller's -`on_block` future ensures that an `async`-lowered caller anywhere higher up on -the stack still gets control flow if `callee` blocks. +In the synchronous case, clearing `interruptible` prevents new tasks from +starting (via `Task.may_enter`) and prevents suspended tasks from resuming +execution (via `Task.suspend`). Propagating the caller's `on_block` future +ensures that an `async`-lowered caller anywhere higher up on the stack still +gets control flow if `callee` blocks. In the asynchronous case, the `on_block` future allows the caller to `await` to see if `callee` blocks on I/O before returning. Because `Task.suspend` (defined @@ -2364,8 +2408,7 @@ async def canon_subtask_drop(task, i): subtask = task.inst.async_subtasks.remove(i) trap_if(subtask.enqueued) trap_if(subtask.state != AsyncCallState.DONE) - trap_if(subtask.supertask is not task) - task.num_async_subtasks -= 1 + subtask.task.num_async_subtasks -= 1 return [] ``` @@ -2447,6 +2490,7 @@ def canon_thread_hw_concurrency(): [Adapter Functions]: FutureFeatures.md#custom-abis-via-adapter-functions [Shared-Everything Dynamic Linking]: examples/SharedEverythingDynamicLinking.md [Structured Concurrency]: Async.md#structured-concurrency +[Current Task]: Async.md#current-task [Administrative Instructions]: https://webassembly.github.io/spec/core/exec/runtime.html#syntax-instr-admin [Implementation Limits]: https://webassembly.github.io/spec/core/appendix/implementation.html diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index 1d7e52c9..2734a1b2 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -272,12 +272,17 @@ def elem_size_flags(labels): if n <= 16: return 2 return 4 -### Call Context +### Context -@dataclass -class CallContext: +class Context: opts: CanonicalOptions inst: ComponentInstance + task: Task + + def __init__(self, opts, inst, task): + self.opts = opts + self.inst = inst + self.task = task ### Canonical ABI Options @@ -296,21 +301,21 @@ class ComponentInstance: handles: HandleTables async_subtasks: Table[AsyncSubtask] may_leave: bool - interruptible: asyncio.Event backpressure: bool - sync_call: bool + calling_sync_export: bool num_tasks: int + interruptible: asyncio.Event pending_tasks: list[tuple[Task, asyncio.Future]] def __init__(self): self.handles = HandleTables() self.async_subtasks = Table[AsyncSubtask]() self.may_leave = True - self.interruptible = asyncio.Event() - self.interruptible.set() self.backpressure = False - self.sync_call = False + self.calling_sync_export = False self.num_tasks = 0 + self.interruptible = asyncio.Event() + self.interruptible.set() self.pending_tasks = [] class HandleTables: @@ -399,25 +404,29 @@ class EventCode(IntEnum): CALL_DONE = AsyncCallState.DONE YIELDED = 4 +Event = Callable[[], tuple[EventCode, int]] + class OnBlockResult(IntEnum): BLOCKED = 0 COMPLETED = 1 current_task = asyncio.Lock() -class Task(CallContext): +class Task(Context): caller: Optional[Task] - on_block: Optional[asyncio.Future[OnBlockResult]] + on_block: asyncio.Future[OnBlockResult] borrow_count: int - events: asyncio.Queue[AsyncSubtask] + events: list[Event] + has_events: asyncio.Event num_async_subtasks: int def __init__(self, opts, inst, caller, on_block): - super().__init__(opts, inst) + super().__init__(opts, inst, self) self.caller = caller self.on_block = on_block self.borrow_count = 0 - self.events = asyncio.Queue[AsyncSubtask]() + self.events = [] + self.has_events = asyncio.Event() self.num_async_subtasks = 0 def trap_if_on_the_stack(self, inst): @@ -433,15 +442,15 @@ async def enter(self): f = asyncio.Future() self.inst.pending_tasks.append((self, f)) await self.suspend(f) - self.inst.num_tasks += 1 if self.opts.sync: - self.inst.sync_call = True + self.inst.calling_sync_export = True + self.inst.num_tasks += 1 def may_enter(self, pending_task): - return self.inst.interruptible.is_set() and \ - not self.inst.backpressure and \ - not self.inst.sync_call and \ - not (pending_task.opts.sync and self.inst.num_tasks > 0) + return not self.inst.backpressure and \ + not self.inst.calling_sync_export and \ + not (pending_task.opts.sync and self.inst.num_tasks > 0) and \ + self.inst.interruptible.is_set() def maybe_start_pending_task(self): if self.inst.pending_tasks: @@ -451,7 +460,7 @@ def maybe_start_pending_task(self): future.set_result(None) async def suspend(self, future): - if self.on_block and not self.on_block.done(): + if not self.on_block.done(): self.on_block.set_result(OnBlockResult.BLOCKED) else: current_task.release() @@ -465,36 +474,39 @@ async def suspend(self, future): async def wait(self): self.maybe_start_pending_task() - subtask = await self.suspend(self.events.get()) - return self.process_event(subtask) + if not self.events: + await self.suspend(self.has_events.wait()) + return self.next_event() + + def next_event(self): + assert(self.events) + if DETERMINISTIC_PROFILE: + i = 0 + else: + i = random.randrange(len(self.events)) + event = self.events.pop(i) + if not self.events: + self.has_events.clear() + return event() + + def notify(self, event): + self.events.append(event) + self.has_events.set() def poll(self): - if self.events.empty(): - return None - return self.process_event(self.events.get_nowait()) + if self.events: + return self.next_event() + return None async def yield_(self): self.maybe_start_pending_task() await self.suspend(asyncio.sleep(0)) def add_async_subtask(self, subtask): - assert(subtask.supertask is None and subtask.index is None) - subtask.supertask = self - subtask.index = self.inst.async_subtasks.add(subtask) + assert(subtask.task is self and not subtask.notify_supertask) + subtask.notify_supertask = True self.num_async_subtasks += 1 - return subtask.index - - def async_subtask_made_progress(self, subtask): - assert(subtask.supertask is self) - if subtask.enqueued: - return - subtask.enqueued = True - self.events.put_nowait(subtask) - - def process_event(self, subtask): - assert(subtask.supertask is self) - subtask.enqueued = False - return (EventCode(subtask.state), subtask.index) + return self.inst.async_subtasks.add(subtask) def create_borrow(self): self.borrow_count += 1 @@ -505,20 +517,20 @@ def drop_borrow(self): def exit(self): assert(current_task.locked()) - assert(self.events.empty()) + assert(not self.events) assert(self.inst.num_tasks >= 1) trap_if(self.borrow_count != 0) trap_if(self.num_async_subtasks != 0) self.inst.num_tasks -= 1 if self.opts.sync: - self.inst.sync_call = False + self.inst.calling_sync_export = False self.maybe_start_pending_task() -class Subtask(CallContext): +class Subtask(Context): lenders: list[HandleElem] - def __init__(self, opts, inst): - super().__init__(opts, inst) + def __init__(self, opts, task): + super().__init__(opts, task.inst, task) self.lenders = [] def track_owning_lend(self, lending_handle): @@ -558,35 +570,38 @@ def exit(self): class AsyncSubtask(Subtask): state: AsyncCallState - supertask: Optional[Task] - index: Optional[int] + notify_supertask: bool enqueued: bool - def __init__(self, opts, inst): - super().__init__(opts, inst) + def __init__(self, opts, task): + super().__init__(opts, task) self.state = AsyncCallState.STARTING - self.supertask = None - self.index = None + self.notify_supertask = False self.enqueued = False + def maybe_notify_supertask(self): + if self.notify_supertask and not self.enqueued: + self.enqueued = True + def event(): + self.enqueued = False + return (EventCode(self.state), self.inst.async_subtasks.array.index(self)) + self.task.notify(event) + def start(self): assert(self.state == AsyncCallState.STARTING) self.state = AsyncCallState.STARTED - if self.supertask is not None: - self.supertask.async_subtask_made_progress(self) + self.maybe_notify_supertask() def return_(self): assert(self.state == AsyncCallState.STARTED) self.state = AsyncCallState.RETURNED - if self.supertask is not None: - self.supertask.async_subtask_made_progress(self) + self.maybe_notify_supertask() def finish(self): super().finish() assert(self.state == AsyncCallState.RETURNED) self.state = AsyncCallState.DONE - if self.supertask is not None: - self.supertask.async_subtask_made_progress(self) + self.maybe_notify_supertask() ### Loading @@ -1375,18 +1390,18 @@ async def canon_lower(opts, callee, ft, task, flat_args): flat_args = CoreValueIter(flat_args) flat_results = None if opts.sync: - subtask = Subtask(opts, task.inst) - task.inst.interruptible.clear() + subtask = Subtask(opts, task) def on_start(): return lift_flat_values(subtask, MAX_FLAT_PARAMS, flat_args, ft.param_types()) def on_return(results): nonlocal flat_results flat_results = lower_flat_values(subtask, MAX_FLAT_RESULTS, results, ft.result_types(), flat_args) + task.inst.interruptible.clear() await callee(task, task.on_block, on_start, on_return) task.inst.interruptible.set() subtask.finish() else: - subtask = AsyncSubtask(opts, task.inst) + subtask = AsyncSubtask(opts, task) on_block = asyncio.Future() async def do_call(): def on_start(): @@ -1524,6 +1539,5 @@ async def canon_subtask_drop(task, i): subtask = task.inst.async_subtasks.remove(i) trap_if(subtask.enqueued) trap_if(subtask.state != AsyncCallState.DONE) - trap_if(subtask.supertask is not task) - task.num_async_subtasks -= 1 + subtask.task.num_async_subtasks -= 1 return [] diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index bd260d8a..f50374bd 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -1,6 +1,8 @@ import definitions from definitions import * +definitions.DETERMINISTIC_PROFILE = True + asyncio.run(definitions.current_task.acquire()) def equal_modulo_string_encoding(s, t): @@ -46,7 +48,8 @@ def mk_opts(memory = bytearray(), encoding = 'utf8', realloc = None, post_return def mk_cx(memory = bytearray(), encoding = 'utf8', realloc = None, post_return = None): opts = mk_opts(memory, encoding, realloc, post_return) - return CallContext(opts, ComponentInstance()) + inst = ComponentInstance() + return Context(opts, inst, Task(opts, inst, None, lambda:())) def mk_str(s): return (s, 'utf8', len(s.encode('utf-8'))) @@ -58,6 +61,11 @@ def mk_tup_rec(x): return x return { str(i):mk_tup_rec(v) for i,v in enumerate(a) } +def mk_on_block(): + f = asyncio.Future() + f.set_result(OnBlockResult.BLOCKED) + return f + def fail(msg): raise BaseException(msg) @@ -159,13 +167,13 @@ def test_pairs(t, pairs): def test_nan32(inbits, outbits): origf = decode_i32_as_float(inbits) f = lift_flat(mk_cx(), CoreValueIter([origf]), F32()) - if DETERMINISTIC_PROFILE: + if definitions.DETERMINISTIC_PROFILE: assert(encode_float_as_i32(f) == outbits) else: assert(not math.isnan(origf) or math.isnan(f)) cx = mk_cx(int.to_bytes(inbits, 4, 'little')) f = load(cx, 0, F32()) - if DETERMINISTIC_PROFILE: + if definitions.DETERMINISTIC_PROFILE: assert(encode_float_as_i32(f) == outbits) else: assert(not math.isnan(origf) or math.isnan(f)) @@ -173,13 +181,13 @@ def test_nan32(inbits, outbits): def test_nan64(inbits, outbits): origf = decode_i64_as_float(inbits) f = lift_flat(mk_cx(), CoreValueIter([origf]), F64()) - if DETERMINISTIC_PROFILE: + if definitions.DETERMINISTIC_PROFILE: assert(encode_float_as_i64(f) == outbits) else: assert(not math.isnan(origf) or math.isnan(f)) cx = mk_cx(int.to_bytes(inbits, 8, 'little')) f = load(cx, 0, F64()) - if DETERMINISTIC_PROFILE: + if definitions.DETERMINISTIC_PROFILE: assert(encode_float_as_i64(f) == outbits) else: assert(not math.isnan(origf) or math.isnan(f)) @@ -587,7 +595,7 @@ def on_return(results): got = results consumer_inst = ComponentInstance() - await canon_lift(consumer_opts, consumer_inst, consumer, ft, None, None, on_start, on_return) + await canon_lift(consumer_opts, consumer_inst, consumer, ft, None, mk_on_block(), on_start, on_return) assert(len(got) == 1) assert(got[0] == 42) @@ -658,7 +666,7 @@ def on_return(results): opts.sync = False opts.callback = callback - await canon_lift(opts, consumer_inst, consumer, consumer_ft, None, None, on_start, on_return) + await canon_lift(opts, consumer_inst, consumer, consumer_ft, None, mk_on_block(), on_start, on_return) assert(got[0] == 83) asyncio.run(test_async_callback()) @@ -734,7 +742,7 @@ def on_return(results): nonlocal got got = results - await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, None, on_start, on_return) + await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, mk_on_block(), on_start, on_return) assert(got[0] == 83) asyncio.run(test_async_to_sync()) @@ -814,10 +822,11 @@ def on_return(results): nonlocal got got = results - await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, None, on_start, on_return) + await canon_lift(consumer_opts, consumer_inst, consumer, consumer_ft, None, mk_on_block(), on_start, on_return) assert(got[0] == 84) -asyncio.run(test_async_backpressure()) +if definitions.DETERMINISTIC_PROFILE: + asyncio.run(test_async_backpressure()) async def test_sync_using_wait(): hostcall_opts = mk_opts() @@ -863,7 +872,7 @@ async def core_func(task, args): inst = ComponentInstance() def on_start(): return [] def on_return(results): pass - await canon_lift(mk_opts(), inst, core_func, ft, None, None, on_start, on_return) + await canon_lift(mk_opts(), inst, core_func, ft, None, mk_on_block(), on_start, on_return) asyncio.run(test_sync_using_wait())