Skip to content

Commit

Permalink
Throw CancellationError instead of returning nil during early can…
Browse files Browse the repository at this point in the history
…cellation. (#2401)

### Motivation:
Follow up PR for #2399

We currently still return `nil` if the current `Task` is canceled before the first call to `NIOThrowingAsyncSequenceProducer.AsyncIterator.next()` but it should throw `CancellationError` too.

In addition, the generic `Failure` type turns out to be a problem. Just throwing a `CancellationError` without checking that `Failure` type is `any Swift.Error` or `CancellationError` introduced a type safety violation as we throw an unrelated type.

### Modifications:

- throw `CancellationError` on eager cancellation
-  deprecates the generic `Failure` type of `NIOThrowingAsyncSequenceProducer`. It now must always be `any Swift.Error`. For backward compatibility we will still return nil if `Failure` is not `any Swift.Error` or `CancellationError`.

### Result:

`CancellationError` is now correctly thrown instead of returning `nil` on eager cancelation. Generic `Failure` type is deprecated.
  • Loading branch information
dnadoba authored Apr 11, 2023
1 parent a7c36a7 commit e0cc6dd
Show file tree
Hide file tree
Showing 4 changed files with 143 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,8 @@ public struct NIOAsyncSequenceProducer<
backPressureStrategy: Strategy,
delegate: Delegate
) -> NewSequence {
let newSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
let newSequence = NIOThrowingAsyncSequenceProducer.makeNonThrowingSequence(
elementType: Element.self,
failureType: Never.self,
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public struct NIOThrowingAsyncSequenceProducer<
/// - backPressureStrategy: The back-pressure strategy of the sequence.
/// - delegate: The delegate of the sequence
/// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source`` and a ``NIOThrowingAsyncSequenceProducer``.
@available(*, deprecated, message: "Support for a generic Failure type is deprecated. Failure type must be `any Swift.Error`.")
@inlinable
public static func makeSequence(
elementType: Element.Type = Element.self,
Expand All @@ -113,6 +114,51 @@ public struct NIOThrowingAsyncSequenceProducer<

return .init(source: source, sequence: sequence)
}

/// Initializes a new ``NIOThrowingAsyncSequenceProducer`` and a ``NIOThrowingAsyncSequenceProducer/Source``.
///
/// - Important: This method returns a struct containing a ``NIOThrowingAsyncSequenceProducer/Source`` and
/// a ``NIOThrowingAsyncSequenceProducer``. The source MUST be held by the caller and
/// used to signal new elements or finish. The sequence MUST be passed to the actual consumer and MUST NOT be held by the
/// caller. This is due to the fact that deiniting the sequence is used as part of a trigger to terminate the underlying source.
///
/// - Parameters:
/// - elementType: The element type of the sequence.
/// - failureType: The failure type of the sequence. Must be `Swift.Error`
/// - backPressureStrategy: The back-pressure strategy of the sequence.
/// - delegate: The delegate of the sequence
/// - Returns: A ``NIOThrowingAsyncSequenceProducer/Source`` and a ``NIOThrowingAsyncSequenceProducer``.
@inlinable
public static func makeSequence(
elementType: Element.Type = Element.self,
failureType: Failure.Type = Error.self,
backPressureStrategy: Strategy,
delegate: Delegate
) -> NewSequence where Failure == Error {
let sequence = Self(
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
let source = Source(storage: sequence._storage)

return .init(source: source, sequence: sequence)
}

/// only used internally by``NIOAsyncSequenceProducer`` to reuse most of the code
@inlinable
internal static func makeNonThrowingSequence(
elementType: Element.Type = Element.self,
backPressureStrategy: Strategy,
delegate: Delegate
) -> NewSequence where Failure == Never {
let sequence = Self(
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
let source = Source(storage: sequence._storage)

return .init(source: source, sequence: sequence)
}

@inlinable
/* private */ internal init(
Expand Down Expand Up @@ -499,7 +545,20 @@ extension NIOThrowingAsyncSequenceProducer {
return delegate

case .resumeContinuationWithCancellationErrorAndCallDidTerminate(let continuation):
continuation.resume(throwing: CancellationError())
// We have deprecated the generic Failure type in the public API and Failure should
// now be `Swift.Error`. However, if users have not migrated to the new API they could
// still use a custom generic Error type and this cast might fail.
// In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the
// non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and
// this cast will fail as well.
// Everything is marked @inlinable and the Failure type is known at compile time,
// therefore this cast should be optimised away in release build.
if let failure = CancellationError() as? Failure {
continuation.resume(throwing: failure)
} else {
continuation.resume(returning: nil)
}

let delegate = self._delegate
self._delegate = nil

Expand Down Expand Up @@ -880,9 +939,26 @@ extension NIOThrowingAsyncSequenceProducer {
switch self._state {
case .initial(_, let iteratorInitialized):
// This can happen if the `Task` that calls `next()` is already cancelled.
self._state = .finished(iteratorInitialized: iteratorInitialized)

return .callDidTerminate

// We have deprecated the generic Failure type in the public API and Failure should
// now be `Swift.Error`. However, if users have not migrated to the new API they could
// still use a custom generic Error type and this cast might fail.
// In addition, we use `NIOThrowingAsyncSequenceProducer` in the implementation of the
// non-throwing variant `NIOAsyncSequenceProducer` where `Failure` will be `Never` and
// this cast will fail as well.
// Everything is marked @inlinable and the Failure type is known at compile time,
// therefore this cast should be optimised away in release build.
if let failure = CancellationError() as? Failure {
self._state = .sourceFinished(
buffer: .init(),
iteratorInitialized: iteratorInitialized,
failure: failure
)
} else {
self._state = .finished(iteratorInitialized: iteratorInitialized)
}

return .none

case .streaming(_, _, .some(let continuation), _, let iteratorInitialized):
// We have an outstanding continuation that needs to resumed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import Atomics

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
final class NIOAsyncSequenceProducerBenchmark: AsyncBenchmark, NIOAsyncSequenceProducerDelegate, @unchecked Sendable {
fileprivate typealias SequenceProducer = NIOThrowingAsyncSequenceProducer<Int, Never, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, NIOAsyncSequenceProducerBenchmark>
fileprivate typealias SequenceProducer = NIOThrowingAsyncSequenceProducer<Int, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, NIOAsyncSequenceProducerBenchmark>

private let iterations: Int
private var iterator: SequenceProducer.AsyncIterator!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,35 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {
XCTAssertTrue(error is CancellationError)
}
}

@available(*, deprecated, message: "tests the deprecated custom generic failure type")
func testTaskCancel_whenStreaming_andSuspended_withCustomErrorType() async throws {
struct CustomError: Error {}
// We are registering our demand and sleeping a bit to make
// sure our task runs when the demand is registered
let backPressureStrategy = MockNIOElementStreamBackPressureStrategy()
let delegate = MockNIOBackPressuredStreamSourceDelegate()
let new = NIOThrowingAsyncSequenceProducer.makeSequence(
elementType: Int.self,
failureType: CustomError.self,
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
let sequence = new.sequence
let task: Task<Int?, Error> = Task {
let iterator = sequence.makeAsyncIterator()
return try await iterator.next()
}
try await Task.sleep(nanoseconds: 1_000_000)

task.cancel()
let result = await task.result
XCTAssertEqualWithoutAutoclosure(await delegate.events.prefix(1).collect(), [.didTerminate])

try withExtendedLifetime(new.source) {
XCTAssertNil(try result.get())
}
}

func testTaskCancel_whenStreaming_andNotSuspended() async throws {
// We are registering our demand and sleeping a bit to make
Expand Down Expand Up @@ -515,9 +544,39 @@ final class NIOThrowingAsyncSequenceProducerTests: XCTestCase {

task.cancel()

let value = try await task.value
let result = await task.result

XCTAssertNil(value)
await XCTAssertThrowsError(try result.get()) { error in
XCTAssertTrue(error is CancellationError, "unexpected error \(error)")
}
}

@available(*, deprecated, message: "tests the deprecated custom generic failure type")
func testTaskCancel_whenStreaming_andTaskIsAlreadyCancelled_withCustomErrorType() async throws {
struct CustomError: Error {}
let backPressureStrategy = MockNIOElementStreamBackPressureStrategy()
let delegate = MockNIOBackPressuredStreamSourceDelegate()
let new = NIOThrowingAsyncSequenceProducer.makeSequence(
elementType: Int.self,
failureType: CustomError.self,
backPressureStrategy: backPressureStrategy,
delegate: delegate
)
let sequence = new.sequence
let task: Task<Int?, Error> = Task {
// We are sleeping here to allow some time for us to cancel the task.
// Once the Task is cancelled we will call `next()`
try? await Task.sleep(nanoseconds: 1_000_000)
let iterator = sequence.makeAsyncIterator()
return try await iterator.next()
}

task.cancel()

let result = await task.result
try withExtendedLifetime(new.source) {
XCTAssertNil(try result.get())
}
}

// MARK: - Next
Expand Down

0 comments on commit e0cc6dd

Please sign in to comment.