diff --git a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift index 47f2a110ea..05246705e3 100644 --- a/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOAsyncSequenceProducer.swift @@ -147,9 +147,8 @@ public struct NIOAsyncSequenceProducer< backPressureStrategy: Strategy, delegate: Delegate ) -> NewSequence { - let newSequence = NIOThrowingAsyncSequenceProducer.makeSequence( + let newSequence = NIOThrowingAsyncSequenceProducer.makeNonThrowingSequence( elementType: Element.self, - failureType: Never.self, backPressureStrategy: backPressureStrategy, delegate: delegate ) diff --git a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift index 3ca9460e28..a0f943006d 100644 --- a/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift +++ b/Sources/NIOCore/AsyncSequences/NIOThrowingAsyncSequenceProducer.swift @@ -98,6 +98,7 @@ public struct NIOThrowingAsyncSequenceProducer< /// - backPressureStrategy: The back-pressure strategy of the sequence. /// - delegate: The delegate of the sequence /// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source`` and a ``NIOThrowingAsyncSequenceProducer``. + @available(*, deprecated, message: "Support for a generic Failure type is deprecated. Failure type must be `any Swift.Error`.") @inlinable public static func makeSequence( elementType: Element.Type = Element.self, @@ -113,6 +114,51 @@ public struct NIOThrowingAsyncSequenceProducer< return .init(source: source, sequence: sequence) } + + /// Initializes a new ``NIOThrowingAsyncSequenceProducer`` and a ``NIOThrowingAsyncSequenceProducer/Source``. + /// + /// - Important: This method returns a struct containing a ``NIOThrowingAsyncSequenceProducer/Source`` and + /// a ``NIOThrowingAsyncSequenceProducer``. The source MUST be held by the caller and + /// used to signal new elements or finish. The sequence MUST be passed to the actual consumer and MUST NOT be held by the + /// caller. This is due to the fact that deiniting the sequence is used as part of a trigger to terminate the underlying source. + /// + /// - Parameters: + /// - elementType: The element type of the sequence. + /// - failureType: The failure type of the sequence. Must be `Swift.Error` + /// - backPressureStrategy: The back-pressure strategy of the sequence. + /// - delegate: The delegate of the sequence + /// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source`` and a ``NIOThrowingAsyncSequenceProducer``. + @inlinable + public static func makeSequence( + elementType: Element.Type = Element.self, + failureType: Failure.Type = Error.self, + backPressureStrategy: Strategy, + delegate: Delegate + ) -> NewSequence where Failure == Error { + let sequence = Self( + backPressureStrategy: backPressureStrategy, + delegate: delegate + ) + let source = Source(storage: sequence._storage) + + return .init(source: source, sequence: sequence) + } + + /// only used internally by``NIOAsyncSequenceProducer`` to reuse most of the code + @inlinable + internal static func makeNonThrowingSequence( + elementType: Element.Type = Element.self, + backPressureStrategy: Strategy, + delegate: Delegate + ) -> NewSequence where Failure == Never { + let sequence = Self( + backPressureStrategy: backPressureStrategy, + delegate: delegate + ) + let source = Source(storage: sequence._storage) + + return .init(source: source, sequence: sequence) + } @inlinable /* private */ internal init( @@ -499,7 +545,20 @@ extension NIOThrowingAsyncSequenceProducer { return delegate case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation): - continuation.resume(throwing: CancellationError()) + // We have deprecated the generic Failure type in the public API and Failure should + // now be `Swift.Error`. However, if users have not migrated to the new API they could + // still use a custom generic Error type and this cast might fail. + // In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the + // non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and + // this cast will fail as well. + // Everything is marked @inlinable and the Failure type is known at compile time, + // therefore this cast should be optimised away in release build. + if let failure = CancellationError() as? Failure { + continuation.resume(throwing: failure) + } else { + continuation.resume(returning: nil) + } + let delegate = self._delegate self._delegate = nil @@ -880,9 +939,26 @@ extension NIOThrowingAsyncSequenceProducer { switch self._state { case .initial(_, let iteratorInitialized): // This can happen if the `Task` that calls `next()` is already cancelled. - self._state = .finished(iteratorInitialized: iteratorInitialized) - - return .callDidTerminate + + // We have deprecated the generic Failure type in the public API and Failure should + // now be `Swift.Error`. However, if users have not migrated to the new API they could + // still use a custom generic Error type and this cast might fail. + // In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the + // non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and + // this cast will fail as well. + // Everything is marked @inlinable and the Failure type is known at compile time, + // therefore this cast should be optimised away in release build. + if let failure = CancellationError() as? Failure { + self._state = .sourceFinished( + buffer: .init(), + iteratorInitialized: iteratorInitialized, + failure: failure + ) + } else { + self._state = .finished(iteratorInitialized: iteratorInitialized) + } + + return .none case .streaming(_, _, .some(let continuation), _, let iteratorInitialized): // We have an outstanding continuation that needs to resumed diff --git a/Sources/NIOPerformanceTester/NIOAsyncSequenceProducerBenchmark.swift b/Sources/NIOPerformanceTester/NIOAsyncSequenceProducerBenchmark.swift index caab76bc4a..b26a0f99b7 100644 --- a/Sources/NIOPerformanceTester/NIOAsyncSequenceProducerBenchmark.swift +++ b/Sources/NIOPerformanceTester/NIOAsyncSequenceProducerBenchmark.swift @@ -19,7 +19,7 @@ import Atomics @available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *) final class NIOAsyncSequenceProducerBenchmark: AsyncBenchmark, NIOAsyncSequenceProducerDelegate, @unchecked Sendable { - fileprivate typealias SequenceProducer = NIOThrowingAsyncSequenceProducer + fileprivate typealias SequenceProducer = NIOThrowingAsyncSequenceProducer private let iterations: Int private var iterator: SequenceProducer.AsyncIterator! diff --git a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift index de4649060f..49f5d78445 100644 --- a/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift +++ b/Tests/NIOCoreTests/AsyncSequences/NIOThrowingAsyncSequenceTests.swift @@ -463,6 +463,35 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { XCTAssertTrue(error is CancellationError) } } + + @available(*, deprecated, message: "tests the deprecated custom generic failure type") + func testTaskCancel_whenStreaming_andSuspended_withCustomErrorType() async throws { + struct CustomError: Error {} + // We are registering our demand and sleeping a bit to make + // sure our task runs when the demand is registered + let backPressureStrategy = MockNIOElementStreamBackPressureStrategy() + let delegate = MockNIOBackPressuredStreamSourceDelegate() + let new = NIOThrowingAsyncSequenceProducer.makeSequence( + elementType: Int.self, + failureType: CustomError.self, + backPressureStrategy: backPressureStrategy, + delegate: delegate + ) + let sequence = new.sequence + let task: Task = Task { + let iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + try await Task.sleep(nanoseconds: 1_000_000) + + task.cancel() + let result = await task.result + XCTAssertEqualWithoutAutoclosure(await delegate.events.prefix(1).collect(), [.didTerminate]) + + try withExtendedLifetime(new.source) { + XCTAssertNil(try result.get()) + } + } func testTaskCancel_whenStreaming_andNotSuspended() async throws { // We are registering our demand and sleeping a bit to make @@ -515,9 +544,39 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase { task.cancel() - let value = try await task.value + let result = await task.result - XCTAssertNil(value) + await XCTAssertThrowsError(try result.get()) { error in + XCTAssertTrue(error is CancellationError, "unexpected error \(error)") + } + } + + @available(*, deprecated, message: "tests the deprecated custom generic failure type") + func testTaskCancel_whenStreaming_andTaskIsAlreadyCancelled_withCustomErrorType() async throws { + struct CustomError: Error {} + let backPressureStrategy = MockNIOElementStreamBackPressureStrategy() + let delegate = MockNIOBackPressuredStreamSourceDelegate() + let new = NIOThrowingAsyncSequenceProducer.makeSequence( + elementType: Int.self, + failureType: CustomError.self, + backPressureStrategy: backPressureStrategy, + delegate: delegate + ) + let sequence = new.sequence + let task: Task = Task { + // We are sleeping here to allow some time for us to cancel the task. + // Once the Task is cancelled we will call `next()` + try? await Task.sleep(nanoseconds: 1_000_000) + let iterator = sequence.makeAsyncIterator() + return try await iterator.next() + } + + task.cancel() + + let result = await task.result + try withExtendedLifetime(new.source) { + XCTAssertNil(try result.get()) + } } // MARK: - Next