Skip to content

Commit

Permalink
Enhance and rename AsyncEmbeddedEventLoop (#2224)
Browse files Browse the repository at this point in the history
* Enhance and rename AsyncEmbeddedEventLoop

Motivation

AsyncEmbeddedEventLoop is an important part of our ongoing testing story
for NIO. However, it suffers from two problems.

The first is a usability one. As we discovered during the original
implementation, following EmbeddedEventLoop's pattern of not having any
EventLoop "Tasks" execute until run() meant that simple constructs like
`EventLoopFuture.wait` and `EventLoopFuture.get` didn't work at all,
forcing us to add an annoying `awaitFuture` method.

When playing with implementing EmbeddedChannel, this got worse, as I/O
methods also don't run immediately if called from the testing thread. We
couldn't easily work around this issue, and it meant that common
patterns (like calling `channel.writeAndFlush`) would deadlock!

This is unacceptable, so a change had to be made.

While we're here, we received feedback that the name is unclear to
users. Given that this particular event loop is in no sense "embedded",
we no longer need the name, so we can take this opportunity to use a
better one.

Modifications

Changed `func execute` to immediately execute its task body, and to
dequeue all pending tasks at this time. Essentially, it's the equivalent
to run(). This is a major change in its behaviour.

Renamed the loop to `NIOAsyncTestingEventLoop`.

Result

Better names, easier to use.

* Make soundness happy

* Remove awaitFuture
  • Loading branch information
Lukasa authored Jul 27, 2022
1 parent d05d2fd commit bedb9fe
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 150 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,23 @@ import NIOConcurrencyHelpers
/// An `EventLoop` that is thread safe and whose execution is fully controlled
/// by the user.
///
/// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `NIOAsyncEmbeddedEventLoop`
/// Unlike more complex `EventLoop`s, such as `SelectableEventLoop`, the `NIOAsyncTestingEventLoop`
/// has no proper eventing mechanism. Instead, reads and writes are fully controlled by the
/// entity that instantiates the `NIOAsyncEmbeddedEventLoop`. This property makes `NIOAsyncEmbeddedEventLoop`
/// entity that instantiates the `NIOAsyncTestingEventLoop`. This property makes `NIOAsyncTestingEventLoop`
/// of limited use for many application purposes, but highly valuable for testing and other
/// kinds of mocking. Unlike `EmbeddedEventLoop`, `NIOAsyncEmbeddedEventLoop` is fully thread-safe and
/// kinds of mocking. Unlike `EmbeddedEventLoop`, `NIOAsyncTestingEventLoop` is fully thread-safe and
/// safe to use from within a Swift concurrency context.
///
/// Unlike `EmbeddedEventLoop`, `NIOAsyncEmbeddedEventLoop` does require that user tests appropriately
/// Unlike `EmbeddedEventLoop`, `NIOAsyncTestingEventLoop` does require that user tests appropriately
/// enforce thread safety. Used carefully it is possible to safely operate the event loop without
/// explicit synchronization, but it is recommended to use `executeInContext` in any case where it's
/// necessary to ensure that the event loop is not making progress.
///
/// Time is controllable on an `NIOAsyncEmbeddedEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)`
/// Time is controllable on an `NIOAsyncTestingEventLoop`. It begins at `NIODeadline.uptimeNanoseconds(0)`
/// and may be advanced by a fixed amount by using `advanceTime(by:)`, or advanced to a point in
/// time with `advanceTime(to:)`.
///
/// If users wish to perform multiple tasks at once on an `NIOAsyncEmbeddedEventLoop`, it is recommended that they
/// If users wish to perform multiple tasks at once on an `NIOAsyncTestingEventLoop`, it is recommended that they
/// use `executeInContext` to perform the operations. For example:
///
/// ```
Expand All @@ -54,10 +54,9 @@ import NIOConcurrencyHelpers
/// There is a tricky requirement around waiting for `EventLoopFuture`s when working with this
/// event loop. Simply calling `.wait()` from the test thread will never complete. This is because
/// `wait` calls `loop.execute` under the hood, and that callback cannot execute without calling
/// `loop.run()`. As a result, if you need to await an `EventLoopFuture` created on this loop you
/// should use `awaitFuture`.
/// `loop.run()`.
@available(macOS 10.15, iOS 13.0, watchOS 6.0, tvOS 13.0, *)
public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
public final class NIOAsyncTestingEventLoop: EventLoop, @unchecked Sendable {
// This type is `@unchecked Sendable` because of the use of `taskNumber`. This
// variable is only used from within `queue`, but the compiler cannot see that.

Expand Down Expand Up @@ -111,7 +110,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
return DispatchQueue.getSpecific(key: Self.inQueueKey) == self.thisLoopID
}

/// Initialize a new `NIOAsyncEmbeddedEventLoop`.
/// Initialize a new `NIOAsyncTestingEventLoop`.
public init() {
self.queue.setSpecific(key: Self.inQueueKey, value: self.thisLoopID)
}
Expand Down Expand Up @@ -172,17 +171,43 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
return self.scheduleTask(deadline: self.now + `in`, task)
}

/// On an `NIOAsyncEmbeddedEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. This means that
/// `task` will be run the next time you call `AsyncEmbeddedEventLoop.run`.
/// On an `NIOAsyncTestingEventLoop`, `execute` will simply use `scheduleTask` with a deadline of _now_. Unlike with the other operations, this will
/// immediately execute, to eliminate a common class of bugs.
public func execute(_ task: @escaping () -> Void) {
self.scheduleTask(deadline: self.now, task)
if self.inEventLoop {
self.scheduleTask(deadline: self.now, task)
} else {
self.queue.async {
self.scheduleTask(deadline: self.now, task)

var tasks = CircularBuffer<EmbeddedScheduledTask>()
while let nextTask = self.scheduledTasks.peek() {
guard nextTask.readyTime <= self.now else {
break
}

// Now we want to grab all tasks that are ready to execute at the same
// time as the first.
while let candidateTask = self.scheduledTasks.peek(), candidateTask.readyTime == nextTask.readyTime {
tasks.append(candidateTask)
self.scheduledTasks.pop()
}

for task in tasks {
task.task()
}

tasks.removeAll(keepingCapacity: true)
}
}
}
}

/// Run all tasks that have previously been submitted to this `NIOAsyncEmbeddedEventLoop`, either by calling `execute` or
/// Run all tasks that have previously been submitted to this `NIOAsyncTestingEventLoop`, either by calling `execute` or
/// events that have been enqueued using `scheduleTask`/`scheduleRepeatedTask`/`scheduleRepeatedAsyncTask` and whose
/// deadlines have expired.
///
/// - seealso: `NIOAsyncEmbeddedEventLoop.advanceTime`.
/// - seealso: `NIOAsyncTestingEventLoop.advanceTime`.
public func run() async {
// Execute all tasks that are currently enqueued to be executed *now*.
await self.advanceTime(to: self.now)
Expand All @@ -194,59 +219,6 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
await self.advanceTime(to: self.now + increment)
}

/// Unwrap a future result from this event loop.
///
/// This replaces `EventLoopFuture.get()` for use with `NIOAsyncEmbeddedEventLoop`. This is necessary because attaching
/// a callback to an `EventLoopFuture` (which is what `EventLoopFuture.get` does) requires scheduling a work item onto
/// the event loop, and running that work item requires spinning the loop. This is a non-trivial dance to get right due
/// to timing issues, so this function provides a helper to ensure that this will work.
public func awaitFuture<ResultType: Sendable>(_ future: EventLoopFuture<ResultType>, timeout: TimeAmount) async throws -> ResultType {
// We need a task group to wait for the scheduled future result, because the future result callback
// can only complete when we actually run the loop, which we need to do in another Task.
return try await withThrowingTaskGroup(of: ResultType.self, returning: ResultType.self) { group in
// We need an innner promise to allow cancellation of the wait.
let promise = self.makePromise(of: ResultType.self)
future.cascade(to: promise)

group.addTask {
try await promise.futureResult.get()
}

group.addTask {
while true {
await self.run()
try Task.checkCancellation()
await Task.yield()
}
}

group.addTask {
try await Task.sleep(nanoseconds: UInt64(timeout.nanoseconds))
promise.fail(NIOAsyncEmbeddedEventLoopError.timeoutAwaitingFuture)

// This self.run() is _very important_. We're about to throw out of this function, which will
// cancel the entire TaskGroup. Depending on how things get scheduled it is possible for that
// cancellation to cancel the runner task above _before_ it spins again, meaning that the
// promise failure never actually happens (as it has to establish event loop context). So
// before we cancel this we make sure that we get a chance to spin the loop.
await self.run()
throw NIOAsyncEmbeddedEventLoopError.timeoutAwaitingFuture
}

do {
// This force-unwrap is safe: there are only three tasks and only one of them can ever return a value,
// the rest will error. The one that does return a value can never return `nil`. In essence, this is an
// `AsyncSequenceOfOneOrError`.
let result = try await group.next()!
group.cancelAll()
return result
} catch {
group.cancelAll()
throw error
}
}
}

/// Runs the event loop and moves "time" forward to the given point in time, running any scheduled
/// tasks that need to be run.
///
Expand Down Expand Up @@ -296,7 +268,7 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
/// accessed from the event loop thread and want to be 100% sure of the thread-safety of accessing them.
///
/// Be careful not to try to spin the event loop again from within this callback, however. As long as this function is on the call
/// stack the `NIOAsyncEmbeddedEventLoop` cannot progress, and so any attempt to progress it will block until this function returns.
/// stack the `NIOAsyncTestingEventLoop` cannot progress, and so any attempt to progress it will block until this function returns.
public func executeInContext<ReturnType: Sendable>(_ task: @escaping @Sendable () throws -> ReturnType) async throws -> ReturnType {
try await withCheckedThrowingContinuation { (continuation: CheckedContinuation<ReturnType, Error>) in
self.queue.async {
Expand Down Expand Up @@ -373,13 +345,13 @@ public final class NIOAsyncEmbeddedEventLoop: EventLoop, @unchecked Sendable {
}

deinit {
precondition(scheduledTasks.isEmpty, "NIOAsyncEmbeddedEventLoop freed with unexecuted scheduled tasks!")
precondition(scheduledTasks.isEmpty, "NIOAsyncTestingEventLoop freed with unexecuted scheduled tasks!")
}
}

/// This is a thread-safe promise creation store.
///
/// We use this to keep track of where promises come from in the `NIOAsyncEmbeddedEventLoop`.
/// We use this to keep track of where promises come from in the `NIOAsyncTestingEventLoop`.
private class PromiseCreationStore {
private let lock = Lock()
private var promiseCreationStore: [_NIOEventLoopFutureIdentifier: (file: StaticString, line: UInt)] = [:]
Expand All @@ -400,24 +372,8 @@ private class PromiseCreationStore {

deinit {
// We no longer need the lock here.
precondition(self.promiseCreationStore.isEmpty, "NIOAsyncEmbeddedEventLoop freed with uncompleted promises!")
precondition(self.promiseCreationStore.isEmpty, "NIOAsyncTestingEventLoop freed with uncompleted promises!")
}
}

/// Errors that can happen on a `NIOAsyncEmbeddedEventLoop`.
public struct NIOAsyncEmbeddedEventLoopError: Error, Hashable {
private enum Backing {
case timeoutAwaitingFuture
}

private var backing: Backing

private init(_ backing: Backing) {
self.backing = backing
}

/// A timeout occurred while waiting for a future to complete.
public static let timeoutAwaitingFuture = Self(.timeoutAwaitingFuture)
}

#endif
2 changes: 1 addition & 1 deletion Tests/LinuxMain.swift
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class LinuxMainRunnerImpl: LinuxMainRunner {
testCase(MessageToByteHandlerTest.allTests),
testCase(MulticastTest.allTests),
testCase(NIOAnyDebugTest.allTests),
testCase(NIOAsyncEmbeddedEventLoopTests.allTests),
testCase(NIOAsyncTestingEventLoopTests.allTests),
testCase(NIOCloseOnErrorHandlerTest.allTests),
testCase(NIOConcurrencyHelpersTests.allTests),
testCase(NIOHTTP1TestServerTest.allTests),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
//
//===----------------------------------------------------------------------===//
//
// AsyncEmbeddedEventLoopTests+XCTest.swift
// AsyncTestingEventLoopTests+XCTest.swift
//
import XCTest

Expand All @@ -22,15 +22,17 @@ import XCTest
/// Do NOT edit this file directly as it will be regenerated automatically when needed.
///

extension NIOAsyncEmbeddedEventLoopTests {
extension NIOAsyncTestingEventLoopTests {

@available(*, deprecated, message: "not actually deprecated. Just deprecated to allow deprecated tests (which test deprecated functionality) without warnings")
static var allTests : [(String, (NIOAsyncEmbeddedEventLoopTests) -> () throws -> Void)] {
static var allTests : [(String, (NIOAsyncTestingEventLoopTests) -> () throws -> Void)] {
return [
("testExecuteDoesNotImmediatelyRunTasks", testExecuteDoesNotImmediatelyRunTasks),
("testExecuteWillRunAllTasks", testExecuteWillRunAllTasks),
("testExecuteWillRunTasksAddedRecursively", testExecuteWillRunTasksAddedRecursively),
("testTasksSubmittedAfterRunDontRun", testTasksSubmittedAfterRunDontRun),
("testExecuteRunsImmediately", testExecuteRunsImmediately),
("testTasksScheduledAfterRunDontRun", testTasksScheduledAfterRunDontRun),
("testSubmitRunsImmediately", testSubmitRunsImmediately),
("testSyncShutdownGracefullyRunsTasks", testSyncShutdownGracefullyRunsTasks),
("testShutdownGracefullyRunsTasks", testShutdownGracefullyRunsTasks),
("testCanControlTime", testCanControlTime),
Expand All @@ -43,7 +45,6 @@ extension NIOAsyncEmbeddedEventLoopTests {
("testScheduledTasksFuturesError", testScheduledTasksFuturesError),
("testTaskOrdering", testTaskOrdering),
("testCancelledScheduledTasksDoNotHoldOnToRunClosure", testCancelledScheduledTasksDoNotHoldOnToRunClosure),
("testWaitingForFutureCanTimeOut", testWaitingForFutureCanTimeOut),
("testDrainScheduledTasks", testDrainScheduledTasks),
("testDrainScheduledTasksDoesNotRunNewlyScheduledTasks", testDrainScheduledTasksDoesNotRunNewlyScheduledTasks),
("testAdvanceTimeToDeadline", testAdvanceTimeToDeadline),
Expand Down
Loading

0 comments on commit bedb9fe

Please sign in to comment.