Skip to content

Commit

Permalink
Land NIOAsyncChannel as SPI (#2397)
Browse files Browse the repository at this point in the history
* Land `NIOAsyncChannel` as SPI

# Motivation

We want to provide bridges from NIO `Channel`s to Swift Concurrency. In previous PRs, we already landed the building blocks namely `NIOAsyncSequenceProducer` and `NIOAsyncWriter`. These two types are highly performant bridges between synchronous and asynchronous code that respect back-pressure.
The next step is to build convenience methods that wrap a `Channel` with these two types.

# Modification
This PR adds a new type called `NIOAsyncChannel` that is capable of wrapping a `Channel`. This is done by adding two handlers to the channel pipeline that are bridging to the `NIOAsyncSequenceProducer` and `NIOAsyncWriter`.
The new `NIOAsyncChannel` type exposes three properties. The underlying `Channel`, a `NIOAsyncChannelInboundStream` and a `NIOAsyncChannelOutboundWriter`. Using these three types the user a able to read/write into the channel using `async` methods.

Importantly, we are landing all of this behind the `@_spi(AsyncChannel`. This allows us to merge PRs while we are still working on the remaining parts such as protocol negotiation.

# Result
We have the first part necessary for our async bridges. Follow up PRs will include the following things:
1.  Bootstrap support
2. Protocol negotiation support
3. Example with documentation

* Add AsyncSequence bridge to NIOAsyncChannelOutboundWriter

* Code review

* Prefix temporary spi public method

* Rename writeAndFlush to write
  • Loading branch information
FranzBusch authored Apr 6, 2023
1 parent 75cea45 commit e7e83d6
Show file tree
Hide file tree
Showing 9 changed files with 1,418 additions and 1 deletion.
2 changes: 1 addition & 1 deletion Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ var targets: [PackageDescription.Target] = [
.executableTarget(name: "NIOAsyncAwaitDemo",
dependencies: ["NIOPosix", "NIOCore", "NIOHTTP1"]),
.testTarget(name: "NIOCoreTests",
dependencies: ["NIOCore", "NIOEmbedded", "NIOFoundationCompat"]),
dependencies: ["NIOCore", "NIOEmbedded", "NIOFoundationCompat", swiftAtomics]),
.testTarget(name: "NIOEmbeddedTests",
dependencies: ["NIOConcurrencyHelpers", "NIOCore", "NIOEmbedded"]),
.testTarget(name: "NIOPosixTests",
Expand Down
133 changes: 133 additions & 0 deletions Sources/NIOCore/AsyncChannel/AsyncChannel.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if swift(>=5.6)
/// Wraps a NIO ``Channel`` object into a form suitable for use in Swift Concurrency.
///
/// ``NIOAsyncChannel`` abstracts the notion of a NIO ``Channel`` into something that
/// can safely be used in a structured concurrency context. In particular, this exposes
/// the following functionality:
///
/// - reads are presented as an `AsyncSequence`
/// - writes can be written to with async functions on a writer, providing backpressure
/// - channels can be closed seamlessly
///
/// This type does not replace the full complexity of NIO's ``Channel``. In particular, it
/// does not expose the following functionality:
///
/// - user events
/// - traditional NIO backpressure such as writability signals and the ``Channel/read()`` call
///
/// Users are encouraged to separate their ``ChannelHandler``s into those that implement
/// protocol-specific logic (such as parsers and encoders) and those that implement business
/// logic. Protocol-specific logic should be implemented as a ``ChannelHandler``, while business
/// logic should use ``NIOAsyncChannel`` to consume and produce data to the network.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public final class NIOAsyncChannel<Inbound: Sendable, Outbound: Sendable>: Sendable {
/// The underlying channel being wrapped by this ``NIOAsyncChannel``.
@_spi(AsyncChannel)
public let channel: Channel
/// The stream of inbound messages.
@_spi(AsyncChannel)
public let inboundStream: NIOAsyncChannelInboundStream<Inbound>
/// The writer for writing outbound messages.
@_spi(AsyncChannel)
public let outboundWriter: NIOAsyncChannelOutboundWriter<Outbound>

/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel``.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel/inboundStream``.
/// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled. Outbound half closure is triggered once
/// the ``NIOAsyncChannelWriter`` is either finished or deinitialized.
/// - inboundType: The ``NIOAsyncChannel/inboundStream`` message's type.
/// - outboundType: The ``NIOAsyncChannel/outboundWriter`` message's type.
@inlinable
@_spi(AsyncChannel)
public init(
synchronouslyWrapping channel: Channel,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = true,
inboundType: Inbound.Type = Inbound.self,
outboundType: Outbound.Type = Outbound.self
) throws {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self.inboundStream, self.outboundWriter) = try channel._syncAddAsyncHandlers(
backpressureStrategy: backpressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled
)
}

/// Initializes a new ``NIOAsyncChannel`` wrapping a ``Channel`` where the outbound type is `Never`.
///
/// This initializer will finish the ``NIOAsyncChannel/outboundWriter`` immediately.
///
/// - Important: This **must** be called on the channel's event loop otherwise this init will crash. This is necessary because
/// we must install the handlers before any other event in the pipeline happens otherwise we might drop reads.
///
/// - Parameters:
/// - channel: The ``Channel`` to wrap.
/// - backpressureStrategy: The backpressure strategy of the ``NIOAsyncChannel/inboundStream``.
/// - isOutboundHalfClosureEnabled: If outbound half closure should be enabled. Outbound half closure is triggered once
/// the ``NIOAsyncChannelWriter`` is either finished or deinitialized.
/// - inboundType: The ``NIOAsyncChannel/inboundStream`` message's type.
@inlinable
@_spi(AsyncChannel)
public init(
synchronouslyWrapping channel: Channel,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark? = nil,
isOutboundHalfClosureEnabled: Bool = true,
inboundType: Inbound.Type = Inbound.self
) throws where Outbound == Never {
channel.eventLoop.preconditionInEventLoop()
self.channel = channel
(self.inboundStream, self.outboundWriter) = try channel._syncAddAsyncHandlers(
backpressureStrategy: backpressureStrategy,
isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled
)

self.outboundWriter.finish()
}
}

extension Channel {
// TODO: We need to remove the public and spi here once we make the AsyncChannel methods public
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@inlinable
@_spi(AsyncChannel)
public func _syncAddAsyncHandlers<Inbound: Sendable, Outbound: Sendable>(
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
isOutboundHalfClosureEnabled: Bool
) throws -> (NIOAsyncChannelInboundStream<Inbound>, NIOAsyncChannelOutboundWriter<Outbound>) {
self.eventLoop.assertInEventLoop()

let closeRatchet = CloseRatchet(isOutboundHalfClosureEnabled: isOutboundHalfClosureEnabled)
let inboundStream = try NIOAsyncChannelInboundStream<Inbound>(
channel: self,
backpressureStrategy: backpressureStrategy,
closeRatchet: closeRatchet
)
let writer = try NIOAsyncChannelOutboundWriter<Outbound>(
channel: self,
closeRatchet: closeRatchet
)
return (inboundStream, writer)
}
}
#endif
90 changes: 90 additions & 0 deletions Sources/NIOCore/AsyncChannel/AsyncChannelInboundStream.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
//===----------------------------------------------------------------------===//
//
// This source file is part of the SwiftNIO open source project
//
// Copyright (c) 2022-2023 Apple Inc. and the SwiftNIO project authors
// Licensed under Apache License v2.0
//
// See LICENSE.txt for license information
// See CONTRIBUTORS.txt for the list of SwiftNIO project authors
//
// SPDX-License-Identifier: Apache-2.0
//
//===----------------------------------------------------------------------===//
#if swift(>=5.6)
/// The inbound message asynchronous sequence of a ``NIOAsyncChannel``.
///
/// This is a unicast async sequence that allows a single iterator to be created.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@_spi(AsyncChannel)
public struct NIOAsyncChannelInboundStream<Inbound: Sendable>: Sendable {
@usableFromInline
typealias Producer = NIOThrowingAsyncSequenceProducer<Inbound, Error, NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark, NIOAsyncChannelInboundStreamChannelHandler<Inbound>.Delegate>

/// The underlying async sequence.
@usableFromInline let _producer: Producer

@inlinable
init(
channel: Channel,
backpressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark?,
closeRatchet: CloseRatchet
) throws {
channel.eventLoop.preconditionInEventLoop()
let handler = NIOAsyncChannelInboundStreamChannelHandler<Inbound>(
eventLoop: channel.eventLoop,
closeRatchet: closeRatchet
)
let strategy: NIOAsyncSequenceProducerBackPressureStrategies.HighLowWatermark

if let userProvided = backpressureStrategy {
strategy = userProvided
} else {
// Default strategy. These numbers are fairly arbitrary, but they line up with the default value of
// maxMessagesPerRead.
strategy = .init(lowWatermark: 2, highWatermark: 10)
}

let sequence = Producer.makeSequence(
backPressureStrategy: strategy,
delegate: NIOAsyncChannelInboundStreamChannelHandler<Inbound>.Delegate(handler: handler)
)
handler.source = sequence.source
try channel.pipeline.syncOperations.addHandler(handler)
self._producer = sequence.sequence
}
}

@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
extension NIOAsyncChannelInboundStream: AsyncSequence {
@_spi(AsyncChannel)
public typealias Element = Inbound

@_spi(AsyncChannel)
public struct AsyncIterator: AsyncIteratorProtocol {
@usableFromInline var _iterator: Producer.AsyncIterator

@inlinable
init(_ iterator: Producer.AsyncIterator) {
self._iterator = iterator
}

@inlinable @_spi(AsyncChannel)
public mutating func next() async throws -> Element? {
return try await self._iterator.next()
}
}

@inlinable
@_spi(AsyncChannel)
public func makeAsyncIterator() -> AsyncIterator {
return AsyncIterator(self._producer.makeAsyncIterator())
}
}

/// The ``NIOAsyncChannelInboundStream/AsyncIterator`` MUST NOT be shared across `Task`s. With marking this as
/// unavailable we are explicitly declaring this.
@available(macOS 10.15, iOS 13, tvOS 13, watchOS 6, *)
@available(*, unavailable)
extension NIOAsyncChannelInboundStream.AsyncIterator: Sendable {}
#endif
Loading

0 comments on commit e7e83d6

Please sign in to comment.