diff --git a/Benchmarks/Thresholds/5.10/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json b/Benchmarks/Thresholds/5.10/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json index 9255c6c429..5786d8dda7 100644 --- a/Benchmarks/Thresholds/5.10/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json +++ b/Benchmarks/Thresholds/5.10/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 1317015 -} \ No newline at end of file + "mallocCountTotal" : 82420 +} diff --git a/Benchmarks/Thresholds/5.9/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json b/Benchmarks/Thresholds/5.9/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json index 810becb3a2..1aded224c9 100644 --- a/Benchmarks/Thresholds/5.9/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json +++ b/Benchmarks/Thresholds/5.9/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 1317022 + "mallocCountTotal" : 82426 } \ No newline at end of file diff --git a/Benchmarks/Thresholds/main/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json b/Benchmarks/Thresholds/main/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json index 9255c6c429..57ca9c0eb0 100644 --- a/Benchmarks/Thresholds/main/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json +++ b/Benchmarks/Thresholds/main/NIOPosixBenchmarks.TCPEchoAsyncChannel.p90.json @@ -1,3 +1,3 @@ { - "mallocCountTotal" : 1317015 + "mallocCountTotal" : 82420 } \ No newline at end of file diff --git a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift index 3d0a78ef3e..33f4d363dc 100644 --- a/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift +++ b/Sources/NIOCore/AsyncChannel/AsyncChannelOutboundWriterHandler.swift @@ -170,25 +170,34 @@ extension NIOAsyncChannelOutboundWriterHandler { @inlinable func didYield(contentsOf sequence: Deque) { - // This always called from an async context, so we must loop-hop. - self.eventLoop.execute { + if self.eventLoop.inEventLoop { self.handler._didYield(sequence: sequence) + } else { + self.eventLoop.execute { + self.handler._didYield(sequence: sequence) + } } } @inlinable func didYield(_ element: OutboundOut) { - // This always called from an async context, so we must loop-hop. - self.eventLoop.execute { + if self.eventLoop.inEventLoop { self.handler._didYield(element: element) + } else { + self.eventLoop.execute { + self.handler._didYield(element: element) + } } } @inlinable func didTerminate(error: Error?) { - // This always called from an async context, so we must loop-hop. - self.eventLoop.execute { + if self.eventLoop.inEventLoop { self.handler._didTerminate(error: error) + } else { + self.eventLoop.execute { + self.handler._didTerminate(error: error) + } } } } diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift index f0356b574e..073d0ee944 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncWriter.swift @@ -20,9 +20,8 @@ import _NIODataStructures /// The delegate of the ``NIOAsyncWriter``. It is the consumer of the yielded writes to the ``NIOAsyncWriter``. /// Furthermore, the delegate gets informed when the ``NIOAsyncWriter`` terminated. /// -/// - Important: The methods on the delegate are called while a lock inside of the ``NIOAsyncWriter`` is held. This is done to -/// guarantee the ordering of the writes. However, this means you **MUST NOT** call ``NIOAsyncWriter/Sink/setWritability(to:)`` -/// from within ``NIOAsyncWriterSinkDelegate/didYield(contentsOf:)`` or ``NIOAsyncWriterSinkDelegate/didTerminate(error:)``. +/// - Important: The methods on the delegate might be called on arbitrary threads and the implementation must ensure +/// that proper synchronization is in place. @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) public protocol NIOAsyncWriterSinkDelegate: Sendable { /// The `Element` type of the delegate and the writer. @@ -34,8 +33,6 @@ public protocol NIOAsyncWriterSinkDelegate: Sendable { /// right away to the delegate. If the ``NIOAsyncWriter`` was _NOT_ writable then the sequence will be buffered /// until the ``NIOAsyncWriter`` becomes writable again. All buffered writes, while the ``NIOAsyncWriter`` is not writable, /// will be coalesced into a single sequence. - /// - /// - Important: You **MUST NOT** call ``NIOAsyncWriter/Sink/setWritability(to:)`` from within this method. func didYield(contentsOf sequence: Deque) /// This method is called once a single element was yielded to the ``NIOAsyncWriter``. @@ -46,8 +43,6 @@ public protocol NIOAsyncWriterSinkDelegate: Sendable { /// will be coalesced into a single sequence. /// /// - Note: This a fast path that you can optionally implement. By default this will just call ``NIOAsyncWriterSinkDelegate/didYield(contentsOf:)``. - /// - /// - Important: You **MUST NOT** call ``NIOAsyncWriter/Sink/setWritability(to:)`` from within this method. func didYield(_ element: Element) /// This method is called once the ``NIOAsyncWriter`` is terminated. @@ -63,8 +58,6 @@ public protocol NIOAsyncWriterSinkDelegate: Sendable { /// - Parameter error: The error that terminated the ``NIOAsyncWriter``. If the writer was terminated without an /// error this value is `nil`. This can be either the error passed to ``NIOAsyncWriter/finish(error:)`` or /// to ``NIOAsyncWriter/Sink/finish(error:)``. - /// - /// - Important: You **MUST NOT** call ``NIOAsyncWriter/Sink/setWritability(to:)`` from within this method. func didTerminate(error: Error?) } @@ -433,64 +426,46 @@ extension NIOAsyncWriter { @inlinable /* fileprivate */ internal func writerDeinitialized() { - self._lock.withLock { - let action = self._stateMachine.writerDeinitialized() + let action = self._lock.withLock { + self._stateMachine.writerDeinitialized() + } - switch action { - case .callDidTerminate(let delegate): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didTerminate(error: nil) + switch action { + case .callDidTerminate(let delegate): + delegate.didTerminate(error: nil) - case .none: - break - } + case .none: + break } + } @inlinable /* fileprivate */ internal func setWritability(to writability: Bool) { - self._lock.withLock { - let action = self._stateMachine.setWritability(to: writability) + let action = self._lock.withLock { + self._stateMachine.setWritability(to: writability) + } - switch action { - case .callDidYieldAndResumeContinuations(let delegate, let elements, let suspendedYields): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didYield(contentsOf: elements) - - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - suspendedYields.forEach { $0.continuation.resume() } - - case .callDidYieldElementAndResumeContinuations(let delegate, let element, let suspendedYields): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didYield(element) + switch action { + case .callDidYieldAndResumeContinuations(let delegate, let elements, let suspendedYields): + delegate.didYield(contentsOf: elements) + suspendedYields.forEach { $0.continuation.resume() } - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - suspendedYields.forEach { $0.continuation.resume() } + case .callDidYieldElementAndResumeContinuations(let delegate, let element, let suspendedYields): + delegate.didYield(element) + suspendedYields.forEach { $0.continuation.resume() } - case .resumeContinuations(let suspendedYields): - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - suspendedYields.forEach { $0.continuation.resume() } + case .resumeContinuations(let suspendedYields): + suspendedYields.forEach { $0.continuation.resume() } - case .callDidYieldAndDidTerminate(let delegate, let elements): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didYield(contentsOf: elements) - delegate.didTerminate(error: nil) + case .callDidYieldAndDidTerminate(let delegate, let elements): + delegate.didYield(contentsOf: elements) + delegate.didTerminate(error: nil) - case .none: - return - } + case .none: + return } + } @inlinable @@ -498,20 +473,16 @@ extension NIOAsyncWriter { let yieldID = self._yieldIDGenerator.generateUniqueYieldID() try await withTaskCancellationHandler { - // We are manually locking here to hold the lock across the withCheckedContinuation call + // We are manually locking here to hold the lock across the withUnsafeContinuation call self._lock.lock() let action = self._stateMachine.yield(contentsOf: sequence, yieldID: yieldID) switch action { case .callDidYield(let delegate): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - // We are allocating a new Deque for every write here - delegate.didYield(contentsOf: Deque(sequence)) self._lock.unlock() + delegate.didYield(contentsOf: Deque(sequence)) case .returnNormally: self._lock.unlock() @@ -522,7 +493,7 @@ extension NIOAsyncWriter { throw error case .suspendTask: - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in self._stateMachine.yield( contentsOf: sequence, continuation: continuation, @@ -533,18 +504,16 @@ extension NIOAsyncWriter { } } } onCancel: { - self._lock.withLock { - let action = self._stateMachine.cancel(yieldID: yieldID) + let action = self._lock.withLock { + self._stateMachine.cancel(yieldID: yieldID) + } - switch action { - case .resumeContinuation(let continuation): - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - continuation.resume() + switch action { + case .resumeContinuation(let continuation): + continuation.resume() - case .none: - break - } + case .none: + break } } } @@ -554,19 +523,15 @@ extension NIOAsyncWriter { let yieldID = self._yieldIDGenerator.generateUniqueYieldID() try await withTaskCancellationHandler { - // We are manually locking here to hold the lock across the withCheckedContinuation call + // We are manually locking here to hold the lock across the withUnsafeContinuation call self._lock.lock() let action = self._stateMachine.yield(contentsOf: CollectionOfOne(element), yieldID: yieldID) switch action { case .callDidYield(let delegate): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - - delegate.didYield(element) self._lock.unlock() + delegate.didYield(element) case .returnNormally: self._lock.unlock() @@ -577,7 +542,7 @@ extension NIOAsyncWriter { throw error case .suspendTask: - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + try await withUnsafeThrowingContinuation { (continuation: UnsafeContinuation) in self._stateMachine.yield( contentsOf: CollectionOfOne(element), continuation: continuation, @@ -588,71 +553,56 @@ extension NIOAsyncWriter { } } } onCancel: { - self._lock.withLock { - let action = self._stateMachine.cancel(yieldID: yieldID) + let action = self._lock.withLock { + self._stateMachine.cancel(yieldID: yieldID) + } - switch action { - case .resumeContinuation(let continuation): - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - continuation.resume() + switch action { + case .resumeContinuation(let continuation): + continuation.resume() - case .none: - break - } + case .none: + break } } } @inlinable /* fileprivate */ internal func writerFinish(error: Error?) { - self._lock.withLock { - let action = self._stateMachine.writerFinish() + let action = self._lock.withLock { + self._stateMachine.writerFinish() + } - switch action { - case .callDidTerminate(let delegate): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didTerminate(error: error) + switch action { + case .callDidTerminate(let delegate): + delegate.didTerminate(error: error) - case .resumeContinuations(let suspendedYields): - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - suspendedYields.forEach { $0.continuation.resume() } + case .resumeContinuations(let suspendedYields): + suspendedYields.forEach { $0.continuation.resume() } - case .none: - break - } + case .none: + break } } @inlinable /* fileprivate */ internal func sinkFinish(error: Error?) { - self._lock.withLock { - let action = self._stateMachine.sinkFinish(error: error) + let action = self._lock.withLock { + self._stateMachine.sinkFinish(error: error) + } - switch action { - case .callDidTerminate(let delegate, let error): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didTerminate(error: error) - - case .resumeContinuationsWithErrorAndCallDidTerminate(let delegate, let suspendedYields, let error): - // We are calling the delegate while holding lock. This can lead to potential crashes - // if the delegate calls `setWritability` reentrantly. However, we call this - // out in the docs of the delegate - delegate.didTerminate(error: error) - - // It is safe to resume the continuations while holding the lock since resume - // is immediately returning and just enqueues the Job on the executor - suspendedYields.forEach { $0.continuation.resume(throwing: error) } + switch action { + case .callDidTerminate(let delegate, let error): + delegate.didTerminate(error: error) - case .none: - break - } + case .resumeContinuationsWithErrorAndCallDidTerminate(let delegate, let suspendedYields, let error): + delegate.didTerminate(error: error) + suspendedYields.forEach { $0.continuation.resume(throwing: error) } + + case .none: + break } + } } } @@ -672,10 +622,10 @@ extension NIOAsyncWriter { /// The yield's produced sequence of elements. /// The yield's continuation. @usableFromInline - var continuation: CheckedContinuation + var continuation: UnsafeContinuation @inlinable - init(yieldID: YieldID, continuation: CheckedContinuation) { + init(yieldID: YieldID, continuation: UnsafeContinuation) { self.yieldID = yieldID self.continuation = continuation } @@ -987,7 +937,7 @@ extension NIOAsyncWriter { @inlinable /* fileprivate */ internal mutating func yield( contentsOf sequence: S, - continuation: CheckedContinuation, + continuation: UnsafeContinuation, yieldID: YieldID ) where S.Element == Element { switch self._state { @@ -1023,7 +973,7 @@ extension NIOAsyncWriter { /// Actions returned by `cancel()`. @usableFromInline enum CancelAction { - case resumeContinuation(CheckedContinuation) + case resumeContinuation(UnsafeContinuation) /// Indicates that nothing should be done. case none } diff --git a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift index 2574d42d92..84319ae625 100644 --- a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift @@ -533,7 +533,7 @@ extension NIOThrowingAsyncSequenceProducer { case .suspendTask: // It is safe to hold the lock across this method // since the closure is guaranteed to be run straight away - return try await withCheckedThrowingContinuation { continuation in + return try await withUnsafeThrowingContinuation { continuation in let action = self._stateMachine.next(for: continuation) switch action { @@ -606,7 +606,7 @@ extension NIOThrowingAsyncSequenceProducer { case streaming( backPressureStrategy: Strategy, buffer: Deque, - continuation: CheckedContinuation?, + continuation: UnsafeContinuation?, hasOutstandingDemand: Bool, iteratorInitialized: Bool ) @@ -787,20 +787,20 @@ extension NIOThrowingAsyncSequenceProducer { /// Indicates that the continuation should be resumed and /// ``NIOThrowingAsyncSequenceProducer/Source/YieldResult/produceMore`` should be returned. case resumeContinuationAndReturnProduceMore( - continuation: CheckedContinuation, + continuation: UnsafeContinuation, element: Element ) /// Indicates that the continuation should be resumed and /// ``NIOThrowingAsyncSequenceProducer/Source/YieldResult/stopProducing`` should be returned. case resumeContinuationAndReturnStopProducing( - continuation: CheckedContinuation, + continuation: UnsafeContinuation, element: Element ) /// Indicates that the yielded elements have been dropped. case returnDropped @usableFromInline - init(shouldProduceMore: Bool, continuationAndElement: (CheckedContinuation, Element)? = nil) { + init(shouldProduceMore: Bool, continuationAndElement: (UnsafeContinuation, Element)? = nil) { switch (shouldProduceMore, continuationAndElement) { case (true, .none): self = .returnProduceMore @@ -902,7 +902,7 @@ extension NIOThrowingAsyncSequenceProducer { enum FinishAction { /// Indicates that the continuation should be resumed with `nil` and /// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case resumeContinuationWithFailureAndCallDidTerminate(CheckedContinuation, Failure?) + case resumeContinuationWithFailureAndCallDidTerminate(UnsafeContinuation, Failure?) /// Indicates that nothing should be done. case none } @@ -956,7 +956,7 @@ extension NIOThrowingAsyncSequenceProducer { case callDidTerminate /// Indicates that the continuation should be resumed with a `CancellationError` and /// that ``NIOAsyncSequenceProducerDelegate/didTerminate()`` should be called. - case resumeContinuationWithCancellationErrorAndCallDidTerminate(CheckedContinuation) + case resumeContinuationWithCancellationErrorAndCallDidTerminate(UnsafeContinuation) /// Indicates that nothing should be done. case none } @@ -1129,7 +1129,7 @@ extension NIOThrowingAsyncSequenceProducer { } @inlinable - mutating func next(for continuation: CheckedContinuation) -> NextForContinuationAction { + mutating func next(for continuation: UnsafeContinuation) -> NextForContinuationAction { switch self._state { case .initial: // We are transitioning away from the initial state in `next()` diff --git a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift index 277cd4bbee..01d281477c 100644 --- a/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift +++ b/Tests/NIOCoreTests/AsyncChannel/AsyncChannelTests.swift @@ -85,7 +85,7 @@ final class AsyncChannelTests: XCTestCase { inboundReader = wrapped.inboundStream try await channel.testingEventLoop.executeInContext { - XCTAssertEqual(0, closeRecorder.outboundCloses) + XCTAssertEqual(1, closeRecorder.outboundCloses) } } @@ -159,7 +159,7 @@ final class AsyncChannelTests: XCTestCase { inboundReader = wrapped.inboundStream try await channel.testingEventLoop.executeInContext { - XCTAssertEqual(0, closeRecorder.allCloses) + XCTAssertEqual(1, closeRecorder.allCloses) } } diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift index 3acb2c0ffc..799b2749d9 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOAsyncWriterTests.swift @@ -15,25 +15,32 @@ import DequeModule import NIOCore import XCTest +import NIOConcurrencyHelpers private struct SomeError: Error, Hashable {} private final class MockAsyncWriterDelegate: NIOAsyncWriterSinkDelegate, @unchecked Sendable { typealias Element = String - var didYieldCallCount = 0 + var _didYieldCallCount = NIOLockedValueBox(0) + var didYieldCallCount: Int { + self._didYieldCallCount.withLockedValue { $0 } + } var didYieldHandler: ((Deque) -> Void)? func didYield(contentsOf sequence: Deque) { - self.didYieldCallCount += 1 + self._didYieldCallCount.withLockedValue { $0 += 1 } if let didYieldHandler = self.didYieldHandler { didYieldHandler(sequence) } } - var didTerminateCallCount = 0 + var _didTerminateCallCount = NIOLockedValueBox(0) + var didTerminateCallCount: Int { + self._didTerminateCallCount.withLockedValue { $0 } + } var didTerminateHandler: ((Error?) -> Void)? func didTerminate(error: Error?) { - self.didTerminateCallCount += 1 + self._didTerminateCallCount.withLockedValue { $0 += 1 } if let didTerminateHandler = self.didTerminateHandler { didTerminateHandler(error) }