Skip to content

Commit

Permalink
Refactor: move maybe_start_pending_task to wait/yield for simplicity
Browse files Browse the repository at this point in the history
  • Loading branch information
lukewagner committed Aug 21, 2024
1 parent 4eb255f commit 655d5ac
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 28 deletions.
46 changes: 23 additions & 23 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,6 @@ re`acquire` the `current_stack` lock to run again.
```python
async def suspend(self, future):
assert(current_task.locked())
self.maybe_start_pending_task()
if self.on_block:
self.on_block()
self.on_block = None
Expand All @@ -599,16 +598,6 @@ reimplemented using the [`suspend`] instruction of the [typed continuations]
proposal, removing the need for `on_block` and the subtle calling contract
between `Task.suspend` and `canon_lift`.

The `pending_tasks` queue (appended to by `enter` above) is emptied (by
`suspend` above and `exit` below) one at a time when backpressure is disabled,
ensuring that each popped tasks gets a chance to start and possibly re-enable
backpressure before the next pending task is started:
```python
def maybe_start_pending_task(self):
if self.inst.pending_tasks and self.may_enter():
self.inst.pending_tasks.pop(0).set_result(None)
```

The `borrow_count` field is used by the following methods to track the number
of borrowed handles that were passed as parameters to the export that have not
yet been dropped (and thus might dangle if the caller destroys the resource
Expand Down Expand Up @@ -643,11 +632,25 @@ guarded to be `0` in `Task.exit` (below) to ensure [structured concurrency].
return
subtask.enqueued = True
self.events.put_nowait(subtask)

```
While a task is running, it may call `wait` (via `canon task.wait` or, when a
`callback` is present, by returning to the event loop) to block until there is
progress on one of the task's async subtasks. Although the Python code uses an
`asyncio.Queue` to coordinate async events, an optimized implementation should
not have to create an actual queue; instead it should be possible to embed a
"next ready" linked list in the elements of the `async_subtasks` table (noting
the `enqueued` guard above ensures that a subtask can be enqueued at most
once).
```python
async def wait(self):
self.maybe_start_pending_task()
subtask = await self.suspend(self.events.get())
return self.process_event(subtask)

def maybe_start_pending_task(self):
if self.inst.pending_tasks and self.may_enter():
self.inst.pending_tasks.pop(0).set_result(None)

def process_event(self, subtask):
assert(subtask.supertask is self)
subtask.enqueued = False
Expand All @@ -656,18 +659,14 @@ guarded to be `0` in `Task.exit` (below) to ensure [structured concurrency].
self.num_async_subtasks -= 1
return (EventCode(subtask.state), subtask.index)
```
While a task is running, it may call `wait` (via `canon task.wait` or, when a
`callback` is present, by returning to the event loop) to block until there is
progress on one of the task's async subtasks. Although the Python code above
uses an `asyncio.Queue` to coordinate async events, an optimized implementation
should not have to create an actual queue; instead it should be possible to
embed a "next ready" linked list in the elements of the `async_subtasks` table
(noting the `enqueued` guard above ensures that a subtask can be enqueued at
most once).
The `pending_tasks` queue (appended to by `enter` above) is emptied by `wait`
(and `yield_` and `exit` below) one at a time once backpressure is turned back
off, ensuring that each popped tasks gets a chance to start and possibly
re-enable backpressure before the next pending task is started:

Alternatively, the current task can call `poll` (via `canon task.poll`, defined
below), which does not block and does not allow the runtime to switch to
another task:
Instead of `wait`ing for a subtask to make progress, the current task can also
call `poll` (via `canon task.poll`, defined below), which does not block and
does not allow the runtime to switch to another task:
```python
def poll(self):
if self.events.empty():
Expand All @@ -680,6 +679,7 @@ the runtime to switch to another ready task, but without blocking on I/O (as
emulated in the Python code here by awaiting a `sleep(0)`).
```python
async def yield_(self):
self.maybe_start_pending_task()
await self.suspend(asyncio.sleep(0))
```

Expand Down
11 changes: 6 additions & 5 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -440,7 +440,6 @@ def may_enter(self):

async def suspend(self, future):
assert(current_task.locked())
self.maybe_start_pending_task()
if self.on_block:
self.on_block()
self.on_block = None
Expand All @@ -450,10 +449,6 @@ async def suspend(self, future):
await current_task.acquire()
return r

def maybe_start_pending_task(self):
if self.inst.pending_tasks and self.may_enter():
self.inst.pending_tasks.pop(0).set_result(None)

def create_borrow(self):
self.borrow_count += 1

Expand All @@ -476,9 +471,14 @@ def async_subtask_made_progress(self, subtask):
self.events.put_nowait(subtask)

async def wait(self):
self.maybe_start_pending_task()
subtask = await self.suspend(self.events.get())
return self.process_event(subtask)

def maybe_start_pending_task(self):
if self.inst.pending_tasks and self.may_enter():
self.inst.pending_tasks.pop(0).set_result(None)

def process_event(self, subtask):
assert(subtask.supertask is self)
subtask.enqueued = False
Expand All @@ -493,6 +493,7 @@ def poll(self):
return self.process_event(self.events.get_nowait())

async def yield_(self):
self.maybe_start_pending_task()
await self.suspend(asyncio.sleep(0))

def exit(self):
Expand Down

0 comments on commit 655d5ac

Please sign in to comment.