Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: URLSessionHTTPClient streams data up on single Task #849

Draft
wants to merge 18 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
let boundStreamBufferSize: Int

/// A buffer to hold data that has been read from the `ReadableStream` but not yet written to the
/// Foundation `OutputStream`. At most, it will contain `bridgeBufferSize` bytes.
private var buffer: Data
/// Foundation `OutputStream`.
///
/// Access the buffer only from the stream bridge queue. At most, at any time it will contain `bridgeBufferSize` bytes.
private var _buffer: Data

/// The `ReadableStream` that will serve as the input to this bridge.
/// The bridge will read bytes from this stream and dump them to the Foundation stream
Expand All @@ -68,28 +70,6 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
/// Server address
private let serverAddress: String

/// Actor used to ensure writes are performed in series, one at a time.
private actor WriteCoordinator {
var task: Task<Void, Error>?

/// Creates a new concurrent Task that executes the passed block, ensuring that the previous Task
/// finishes before this task starts.
///
/// Acts as a sort of "serial queue" of Swift concurrency tasks.
/// - Parameter block: The code to be performed in this task.
func perform(_ block: @escaping @Sendable () async throws -> Void) async throws {
let task = Task { [task] in
_ = await task?.result
try await block()
}
self.task = task
_ = try await task.value
}
}

/// Actor used to enforce the order of multiple concurrent stream writes.
private let writeCoordinator = WriteCoordinator()

/// A serial `DispatchQueue` to run the stream operations for the Foundation `OutputStream`.
///
/// Operations performed on the queue include:
Expand Down Expand Up @@ -132,28 +112,35 @@ class FoundationStreamBridge: NSObject, StreamDelegate {
) {
self.bridgeBufferSize = bridgeBufferSize
self.boundStreamBufferSize = boundStreamBufferSize ?? bridgeBufferSize
self.buffer = Data(capacity: bridgeBufferSize)
self._buffer = Data(capacity: bridgeBufferSize)
self.readableStream = readableStream
self.logger = logger
self.telemetry = telemetry
self.serverAddress = serverAddress
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: self.boundStreamBufferSize, queue: queue)
}

func replaceStreams(completion: @escaping (InputStream?) -> Void) async {
// Close the current output stream, since it will accept no more data and is about to
// be replaced.
await close()
deinit {
_streamTask?.cancel()
}

func replaceStreams(completion: @escaping (InputStream?) -> Void) {

queue.async { [self] in
// Close the current output stream, since it will accept no more data and is about to
// be replaced.
_close()

// Replace the bound stream pair with new bound streams.
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue)
// Replace the bound stream pair with new bound streams.
(inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue)

// Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`,
// the completion block will be that method's completion handler.
completion(inputStream)
// Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`,
// the completion block will be that method's completion handler.
completion(inputStream)

// Re-open the `OutputStream` for writing.
await open()
// Re-open the `OutputStream` for writing.
_open()
}
}

private static func makeStreams(boundStreamBufferSize: Int, queue: DispatchQueue) -> (InputStream, OutputStream) {
Expand Down Expand Up @@ -186,149 +173,171 @@ class FoundationStreamBridge: NSObject, StreamDelegate {

/// Open the output stream and schedule this bridge to receive stream delegate callbacks.
///
/// Stream operations are performed on the stream's queue.
/// Stream opening is completed before asynchronous return to the caller.
func open() async {
await withCheckedContinuation { continuation in
queue.async {
self.outputStream.delegate = self
self.outputStream.open()
continuation.resume()
}
}
/// Stream operations are performed on the bridge's queue using a synchronous task, so that the stream has been opened
/// before returning. Do not call this method from the bridge's queue, or a deadlock will occur.
func open() {
queue.sync { _open() }
}

private func _open() {
self.outputStream.delegate = self
self.outputStream.open()
}

/// Close the output stream and unschedule this bridge from receiving stream delegate callbacks.
///
/// Stream operations are performed on the stream's queue.
/// Stream closing is completed before asynchronous return to the caller.
func close() async {
await withCheckedContinuation { continuation in
queue.async {
self.outputStream.close()
self.outputStream.delegate = nil
continuation.resume()
}
}
/// Stream operations are performed on the bridge's queue using a synchronous task, so that the stream has been closed
/// before returning. Do not call this method from the bridge's queue, or a deadlock will occur.
func close() {
_streamTask?.cancel()
queue.sync { _close() }
}

private func _close() {
self.outputStream.close()
self.outputStream.delegate = nil
}

// MARK: - Writing to bridge

/// Writes buffered data to the output stream.
/// If the buffer is empty, the `ReadableStream` will be read first to replenish the buffer.
///
/// If the buffer is empty and the readable stream is closed, there is no more data to bridge, and the output stream is closed.
private func writeToOutput() async throws {

// Perform the write on the `WriteCoordinator` to ensure that writes happen in-order
// and one at a time.
//
// Note that it is safe to access `buffer` and `readableStreamIsClosed` instance vars
// from inside the block passed to `perform()` because this is the only place
// these instance vars are accessed, and the code in the `perform()` block runs
// in series with any other calls to `perform()`.
try await writeCoordinator.perform { [self] in

// If there is no data in the buffer and the `ReadableStream` is still open,
// attempt to read the stream. Otherwise, skip reading the `ReadableStream` and
// write what's in the buffer immediately.
if !readableStreamIsClosed && buffer.isEmpty {
if let newData = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) {
buffer.append(newData)
private var _streamTask: Task<Void, Never>?
private var _spaceAvailableState = SpaceAvailableState.noSpaceAvailable

enum SpaceAvailableState {
case noSpaceAvailable
case hasSpaceAvailable
case awaitingSpaceAvailable(CheckedContinuation<Void, Never>)
}

private func _startFeed() {
guard _streamTask == nil else { return }
_streamTask = Task {
var readableStreamIsOpen = true
var bufferCount = 0

while bufferCount > 0 || readableStreamIsOpen {
var readableStreamData: Data?
if readableStreamIsOpen {
let availableBufferSize = self.boundStreamBufferSize - bufferCount
do {
readableStreamData = try await readableStream.readAsync(upToCount: availableBufferSize)
} catch {
logger.error("Readable stream error received: \(error)")
readableStreamIsOpen = false
}
}
if let readableStreamData {
await waitForSpaceAvailable()
bufferCount = await writeToBufferAndFlush(readableStreamData)
} else {
readableStreamIsClosed = true
readableStreamIsOpen = false
bufferCount = await writeToBufferAndFlush(Data())
}
}

// Write the previously buffered data and/or newly read data, if any, to the Foundation `OutputStream`.
// Capture the error from the stream write, if any.
var streamError: Error?
if !buffer.isEmpty {
streamError = await writeToOutputStream()
await withCheckedContinuation { continuation in
self.close()
continuation.resume()
}
}
}

// If the readable stream has closed and there is no data in the buffer,
// there is nothing left to forward to the output stream, so close it.
if readableStreamIsClosed && buffer.isEmpty {
await close()
private func writeToBufferAndFlush(_ data: Data) async -> Int {
await withCheckedContinuation { continuation in
queue.async {
self._buffer.append(data)
if !self._buffer.isEmpty {
self._writeToOutputStream()
}
continuation.resume(returning: self._buffer.count)
}

// If the output stream write produced an error, throw it now, else just return.
if let streamError { throw streamError }
}
}

/// Using the output stream's callback queue, write the buffered data to the Foundation `OutputStream`.
///
/// After writing, remove the written data from the buffer.
/// - Returns: The error resulting from the write to the Foundation `OutputStream`, or `nil` if no error occurred.
private func writeToOutputStream() async -> Error? {

// Suspend the caller while the write is performed on the Foundation `OutputStream`'s queue.
private func waitForSpaceAvailable() async {
await withCheckedContinuation { continuation in

// Perform the write to the Foundation `OutputStream` on its queue.
queue.async { [self] in

// Write to the output stream. It may not accept all data, so get the number of bytes
// it accepted in `writeCount`.
var writeCount = 0
buffer.withUnsafeBytes { bufferPtr in
guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return }
writeCount = outputStream.write(bytePtr, maxLength: buffer.count)
queue.async {
switch self._spaceAvailableState {
case .noSpaceAvailable:
self._spaceAvailableState = .awaitingSpaceAvailable(continuation)
case .hasSpaceAvailable:
self._spaceAvailableState = .noSpaceAvailable
continuation.resume()
case .awaitingSpaceAvailable:
fatalError()
}
}
}
}

// `writeCount` will be a positive number if bytes were written.
// Remove the written bytes from the front of the buffer.
if writeCount > 0 {
logger.debug("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
buffer.removeFirst(writeCount)
// TICK - smithy.client.http.bytes_sent
var attributes = Attributes()
attributes.set(
key: HttpMetricsAttributesKeys.serverAddress,
value: serverAddress)
telemetry.bytesSent.add(
value: writeCount,
attributes: attributes,
context: telemetry.contextManager.current())
}
/// Write the buffered data to the Foundation `OutputStream` and write metrics.
///
/// Call this method only from the bridge queue.
private func _writeToOutputStream() {
// Write to the output stream. It may not accept all data, so get the number of bytes
// it accepted in `writeCount`.
var writeCount = 0
_buffer.withUnsafeBytes { bufferPtr in
guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return }
writeCount = outputStream.write(bytePtr, maxLength: _buffer.count)
}

// Resume the caller now that the write is complete, returning the stream error, if any.
continuation.resume(returning: outputStream.streamError)
}
// `writeCount` will be a positive number if bytes were written.
// Remove the written bytes from the front of the buffer.
if writeCount > 0 {
logger.debug("FoundationStreamBridge: wrote \(writeCount) bytes to request body")
_buffer.removeFirst(writeCount)
// TICK - smithy.client.http.bytes_sent
var attributes = Attributes()
attributes.set(
key: HttpMetricsAttributesKeys.serverAddress,
value: serverAddress)
telemetry.bytesSent.add(
value: writeCount,
attributes: attributes,
context: telemetry.contextManager.current())
}
}

private func _updateHasSpaceAvailable() {
switch _spaceAvailableState {
case .noSpaceAvailable:
_spaceAvailableState = .hasSpaceAvailable
case .hasSpaceAvailable:
break
case .awaitingSpaceAvailable(let continuation):
_spaceAvailableState = .noSpaceAvailable
continuation.resume()
}
}

// MARK: - StreamDelegate protocol

/// The stream places this callback when an event happens.
///
/// The `FoundationStreamBridge` sets itself as the delegate of the Foundation `OutputStream` whenever the
/// `OutputStream` is open. Stream callbacks will be delivered on the GCD serial queue.
/// `OutputStream` is open. Stream callbacks will be delivered on the stream bridge's GCD serial queue by calling this method.
///
/// `.hasSpaceAvailable` is the only event where the `FoundationStreamBridge` takes action; in response to
/// this event, the `FoundationStreamBridge` will write data to the `OutputStream`.
/// The `.openCompleted` event starts a `Task` to perform the data transfer the first time it is called; that `Task` will continue
/// until the transfer is complete, even if the streams have to be replaced for the transfer.
///
/// This method is implemented for the Foundation `StreamDelegate` protocol.
/// The `.hasSpaceAvailable` event updates the write coordinator so that data transfer will start or resume.
/// - Parameters:
/// - aStream: The stream which experienced the event.
/// - eventCode: A code describing the type of event that happened.
@objc func stream(_ aStream: Foundation.Stream, handle eventCode: Foundation.Stream.Event) {
switch eventCode {
case .openCompleted:
break
_startFeed()
case .hasBytesAvailable:
// not used by OutputStream
break
case .hasSpaceAvailable:
// Since space is available, try and read from the ReadableStream and
// transfer the data to the Foundation stream pair.
// Use a `Task` to perform the operation within Swift concurrency.
Task { try await writeToOutput() }
// Inform the write coordinator that there is space available
// on the output stream so that writing of data may be resumed at the proper time.
_updateHasSpaceAvailable()
case .errorOccurred:
logger.info("FoundationStreamBridge: .errorOccurred event")
logger.info("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)")
logger.error("FoundationStreamBridge: .errorOccurred event")
logger.error("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)")
case .endEncountered:
// not used by OutputStream
break
default:
logger.info("FoundationStreamBridge: Other stream event occurred: \(eventCode)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ extension URLSessionConfiguration {
config.httpShouldSetCookies = false
config.httpCookieAcceptPolicy = .never
config.httpCookieStorage = nil
config.tlsMaximumSupportedProtocolVersion = .TLSv12
config.tlsMaximumSupportedProtocolVersion = .TLSv13
config.httpMaximumConnectionsPerHost = 24
return config
}
}
Expand Down
Loading
Loading