diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift b/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift index 508b25613..05739fab3 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/FoundationStreamBridge.swift @@ -41,8 +41,10 @@ class FoundationStreamBridge: NSObject, StreamDelegate { let boundStreamBufferSize: Int /// A buffer to hold data that has been read from the `ReadableStream` but not yet written to the - /// Foundation `OutputStream`. At most, it will contain `bridgeBufferSize` bytes. - private var buffer: Data + /// Foundation `OutputStream`. + /// + /// Access the buffer only from the stream bridge queue. At most, at any time it will contain `bridgeBufferSize` bytes. + private var _buffer: Data /// The `ReadableStream` that will serve as the input to this bridge. /// The bridge will read bytes from this stream and dump them to the Foundation stream @@ -68,28 +70,6 @@ class FoundationStreamBridge: NSObject, StreamDelegate { /// Server address private let serverAddress: String - /// Actor used to ensure writes are performed in series, one at a time. - private actor WriteCoordinator { - var task: Task? - - /// Creates a new concurrent Task that executes the passed block, ensuring that the previous Task - /// finishes before this task starts. - /// - /// Acts as a sort of "serial queue" of Swift concurrency tasks. - /// - Parameter block: The code to be performed in this task. - func perform(_ block: @escaping @Sendable () async throws -> Void) async throws { - let task = Task { [task] in - _ = await task?.result - try await block() - } - self.task = task - _ = try await task.value - } - } - - /// Actor used to enforce the order of multiple concurrent stream writes. - private let writeCoordinator = WriteCoordinator() - /// A serial `DispatchQueue` to run the stream operations for the Foundation `OutputStream`. /// /// Operations performed on the queue include: @@ -132,7 +112,7 @@ class FoundationStreamBridge: NSObject, StreamDelegate { ) { self.bridgeBufferSize = bridgeBufferSize self.boundStreamBufferSize = boundStreamBufferSize ?? bridgeBufferSize - self.buffer = Data(capacity: bridgeBufferSize) + self._buffer = Data(capacity: bridgeBufferSize) self.readableStream = readableStream self.logger = logger self.telemetry = telemetry @@ -140,20 +120,27 @@ class FoundationStreamBridge: NSObject, StreamDelegate { (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: self.boundStreamBufferSize, queue: queue) } - func replaceStreams(completion: @escaping (InputStream?) -> Void) async { - // Close the current output stream, since it will accept no more data and is about to - // be replaced. - await close() + deinit { + _streamTask?.cancel() + } + + func replaceStreams(completion: @escaping (InputStream?) -> Void) { + + queue.async { [self] in + // Close the current output stream, since it will accept no more data and is about to + // be replaced. + _close() - // Replace the bound stream pair with new bound streams. - (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue) + // Replace the bound stream pair with new bound streams. + (inputStream, outputStream) = Self.makeStreams(boundStreamBufferSize: boundStreamBufferSize, queue: queue) - // Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`, - // the completion block will be that method's completion handler. - completion(inputStream) + // Call the completion block. When this method is called from `urlSession(_:task:needNewBodyStream:)`, + // the completion block will be that method's completion handler. + completion(inputStream) - // Re-open the `OutputStream` for writing. - await open() + // Re-open the `OutputStream` for writing. + _open() + } } private static func makeStreams(boundStreamBufferSize: Int, queue: DispatchQueue) -> (InputStream, OutputStream) { @@ -186,149 +173,171 @@ class FoundationStreamBridge: NSObject, StreamDelegate { /// Open the output stream and schedule this bridge to receive stream delegate callbacks. /// - /// Stream operations are performed on the stream's queue. - /// Stream opening is completed before asynchronous return to the caller. - func open() async { - await withCheckedContinuation { continuation in - queue.async { - self.outputStream.delegate = self - self.outputStream.open() - continuation.resume() - } - } + /// Stream operations are performed on the bridge's queue using a synchronous task, so that the stream has been opened + /// before returning. Do not call this method from the bridge's queue, or a deadlock will occur. + func open() { + queue.sync { _open() } + } + + private func _open() { + self.outputStream.delegate = self + self.outputStream.open() } /// Close the output stream and unschedule this bridge from receiving stream delegate callbacks. /// - /// Stream operations are performed on the stream's queue. - /// Stream closing is completed before asynchronous return to the caller. - func close() async { - await withCheckedContinuation { continuation in - queue.async { - self.outputStream.close() - self.outputStream.delegate = nil - continuation.resume() - } - } + /// Stream operations are performed on the bridge's queue using a synchronous task, so that the stream has been closed + /// before returning. Do not call this method from the bridge's queue, or a deadlock will occur. + func close() { + _streamTask?.cancel() + queue.sync { _close() } + } + + private func _close() { + self.outputStream.close() + self.outputStream.delegate = nil } // MARK: - Writing to bridge - /// Writes buffered data to the output stream. - /// If the buffer is empty, the `ReadableStream` will be read first to replenish the buffer. - /// - /// If the buffer is empty and the readable stream is closed, there is no more data to bridge, and the output stream is closed. - private func writeToOutput() async throws { - - // Perform the write on the `WriteCoordinator` to ensure that writes happen in-order - // and one at a time. - // - // Note that it is safe to access `buffer` and `readableStreamIsClosed` instance vars - // from inside the block passed to `perform()` because this is the only place - // these instance vars are accessed, and the code in the `perform()` block runs - // in series with any other calls to `perform()`. - try await writeCoordinator.perform { [self] in - - // If there is no data in the buffer and the `ReadableStream` is still open, - // attempt to read the stream. Otherwise, skip reading the `ReadableStream` and - // write what's in the buffer immediately. - if !readableStreamIsClosed && buffer.isEmpty { - if let newData = try await readableStream.readAsync(upToCount: bridgeBufferSize - buffer.count) { - buffer.append(newData) + private var _streamTask: Task? + private var _spaceAvailableState = SpaceAvailableState.noSpaceAvailable + + enum SpaceAvailableState { + case noSpaceAvailable + case hasSpaceAvailable + case awaitingSpaceAvailable(CheckedContinuation) + } + + private func _startFeed() { + guard _streamTask == nil else { return } + _streamTask = Task { + var readableStreamIsOpen = true + var bufferCount = 0 + + while bufferCount > 0 || readableStreamIsOpen { + var readableStreamData: Data? + if readableStreamIsOpen { + let availableBufferSize = self.boundStreamBufferSize - bufferCount + do { + readableStreamData = try await readableStream.readAsync(upToCount: availableBufferSize) + } catch { + logger.error("Readable stream error received: \(error)") + readableStreamIsOpen = false + } + } + if let readableStreamData { + await waitForSpaceAvailable() + bufferCount = await writeToBufferAndFlush(readableStreamData) } else { - readableStreamIsClosed = true + readableStreamIsOpen = false + bufferCount = await writeToBufferAndFlush(Data()) } } - - // Write the previously buffered data and/or newly read data, if any, to the Foundation `OutputStream`. - // Capture the error from the stream write, if any. - var streamError: Error? - if !buffer.isEmpty { - streamError = await writeToOutputStream() + await withCheckedContinuation { continuation in + self.close() + continuation.resume() } + } + } - // If the readable stream has closed and there is no data in the buffer, - // there is nothing left to forward to the output stream, so close it. - if readableStreamIsClosed && buffer.isEmpty { - await close() + private func writeToBufferAndFlush(_ data: Data) async -> Int { + await withCheckedContinuation { continuation in + queue.async { + self._buffer.append(data) + if !self._buffer.isEmpty { + self._writeToOutputStream() + } + continuation.resume(returning: self._buffer.count) } - - // If the output stream write produced an error, throw it now, else just return. - if let streamError { throw streamError } } } - /// Using the output stream's callback queue, write the buffered data to the Foundation `OutputStream`. - /// - /// After writing, remove the written data from the buffer. - /// - Returns: The error resulting from the write to the Foundation `OutputStream`, or `nil` if no error occurred. - private func writeToOutputStream() async -> Error? { - - // Suspend the caller while the write is performed on the Foundation `OutputStream`'s queue. + private func waitForSpaceAvailable() async { await withCheckedContinuation { continuation in - - // Perform the write to the Foundation `OutputStream` on its queue. - queue.async { [self] in - - // Write to the output stream. It may not accept all data, so get the number of bytes - // it accepted in `writeCount`. - var writeCount = 0 - buffer.withUnsafeBytes { bufferPtr in - guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return } - writeCount = outputStream.write(bytePtr, maxLength: buffer.count) + queue.async { + switch self._spaceAvailableState { + case .noSpaceAvailable: + self._spaceAvailableState = .awaitingSpaceAvailable(continuation) + case .hasSpaceAvailable: + self._spaceAvailableState = .noSpaceAvailable + continuation.resume() + case .awaitingSpaceAvailable: + fatalError() } + } + } + } - // `writeCount` will be a positive number if bytes were written. - // Remove the written bytes from the front of the buffer. - if writeCount > 0 { - logger.debug("FoundationStreamBridge: wrote \(writeCount) bytes to request body") - buffer.removeFirst(writeCount) - // TICK - smithy.client.http.bytes_sent - var attributes = Attributes() - attributes.set( - key: HttpMetricsAttributesKeys.serverAddress, - value: serverAddress) - telemetry.bytesSent.add( - value: writeCount, - attributes: attributes, - context: telemetry.contextManager.current()) - } + /// Write the buffered data to the Foundation `OutputStream` and write metrics. + /// + /// Call this method only from the bridge queue. + private func _writeToOutputStream() { + // Write to the output stream. It may not accept all data, so get the number of bytes + // it accepted in `writeCount`. + var writeCount = 0 + _buffer.withUnsafeBytes { bufferPtr in + guard let bytePtr = bufferPtr.bindMemory(to: UInt8.self).baseAddress else { return } + writeCount = outputStream.write(bytePtr, maxLength: _buffer.count) + } - // Resume the caller now that the write is complete, returning the stream error, if any. - continuation.resume(returning: outputStream.streamError) - } + // `writeCount` will be a positive number if bytes were written. + // Remove the written bytes from the front of the buffer. + if writeCount > 0 { + logger.debug("FoundationStreamBridge: wrote \(writeCount) bytes to request body") + _buffer.removeFirst(writeCount) + // TICK - smithy.client.http.bytes_sent + var attributes = Attributes() + attributes.set( + key: HttpMetricsAttributesKeys.serverAddress, + value: serverAddress) + telemetry.bytesSent.add( + value: writeCount, + attributes: attributes, + context: telemetry.contextManager.current()) + } + } + + private func _updateHasSpaceAvailable() { + switch _spaceAvailableState { + case .noSpaceAvailable: + _spaceAvailableState = .hasSpaceAvailable + case .hasSpaceAvailable: + break + case .awaitingSpaceAvailable(let continuation): + _spaceAvailableState = .noSpaceAvailable + continuation.resume() } } // MARK: - StreamDelegate protocol - /// The stream places this callback when an event happens. - /// /// The `FoundationStreamBridge` sets itself as the delegate of the Foundation `OutputStream` whenever the - /// `OutputStream` is open. Stream callbacks will be delivered on the GCD serial queue. + /// `OutputStream` is open. Stream callbacks will be delivered on the stream bridge's GCD serial queue by calling this method. /// - /// `.hasSpaceAvailable` is the only event where the `FoundationStreamBridge` takes action; in response to - /// this event, the `FoundationStreamBridge` will write data to the `OutputStream`. + /// The `.openCompleted` event starts a `Task` to perform the data transfer the first time it is called; that `Task` will continue + /// until the transfer is complete, even if the streams have to be replaced for the transfer. /// - /// This method is implemented for the Foundation `StreamDelegate` protocol. + /// The `.hasSpaceAvailable` event updates the write coordinator so that data transfer will start or resume. /// - Parameters: /// - aStream: The stream which experienced the event. /// - eventCode: A code describing the type of event that happened. @objc func stream(_ aStream: Foundation.Stream, handle eventCode: Foundation.Stream.Event) { switch eventCode { case .openCompleted: - break + _startFeed() case .hasBytesAvailable: + // not used by OutputStream break case .hasSpaceAvailable: - // Since space is available, try and read from the ReadableStream and - // transfer the data to the Foundation stream pair. - // Use a `Task` to perform the operation within Swift concurrency. - Task { try await writeToOutput() } + // Inform the write coordinator that there is space available + // on the output stream so that writing of data may be resumed at the proper time. + _updateHasSpaceAvailable() case .errorOccurred: - logger.info("FoundationStreamBridge: .errorOccurred event") - logger.info("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)") + logger.error("FoundationStreamBridge: .errorOccurred event") + logger.error("FoundationStreamBridge: Stream error: \(aStream.streamError.debugDescription)") case .endEncountered: + // not used by OutputStream break default: logger.info("FoundationStreamBridge: Other stream event occurred: \(eventCode)") diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift index 7f5daa22b..d3aff6e50 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionConfiguration+HTTPClientConfiguration.swift @@ -17,7 +17,8 @@ extension URLSessionConfiguration { config.httpShouldSetCookies = false config.httpCookieAcceptPolicy = .never config.httpCookieStorage = nil - config.tlsMaximumSupportedProtocolVersion = .TLSv12 + config.tlsMaximumSupportedProtocolVersion = .TLSv13 + config.httpMaximumConnectionsPerHost = 24 return config } } diff --git a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift index 6f49023f2..959b7b01b 100644 --- a/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift +++ b/Sources/ClientRuntime/Networking/Http/URLSession/URLSessionHTTPClient.swift @@ -314,7 +314,7 @@ public final class URLSessionHTTPClient: HTTPClient { ) { storage.modify(task) { connection in guard let streamBridge = connection.streamBridge else { completionHandler(nil); return } - Task { await streamBridge.replaceStreams(completion: completionHandler) } + streamBridge.replaceStreams(completion: completionHandler) } } @@ -377,12 +377,10 @@ public final class URLSessionHTTPClient: HTTPClient { } // Close the stream bridge so that its resources are deallocated - Task { - await connection.streamBridge?.close() + connection.streamBridge?.close() - // Task is complete & no longer needed. Remove it from storage. - storage.remove(task) - } + // Task is complete & no longer needed. Remove it from storage. + storage.remove(task) } } } @@ -485,7 +483,19 @@ public final class URLSessionHTTPClient: HTTPClient { // Convert the HTTP request body into a URLSession `Body`. switch request.body { case .data(let data): - body = .data(data) +// body = .data(data) + // Create a stream bridge that streams data from a SDK stream to a Foundation InputStream + // that URLSession can stream its request body from. + // Allow 16kb of in-memory buffer for request body streaming + let bridge = FoundationStreamBridge( + readableStream: BufferedStream(data: data, isClosed: true), + bridgeBufferSize: 16_384, + logger: logger, + telemetry: telemetry, + serverAddress: URLSessionHTTPClient.makeServerAddress(request: request) + ) + streamBridge = bridge + body = .stream(bridge) case .stream(let stream): // Create a stream bridge that streams data from a SDK stream to a Foundation InputStream // that URLSession can stream its request body from. @@ -563,9 +573,7 @@ public final class URLSessionHTTPClient: HTTPClient { logger.debug("URLRequest(\(httpMethod) \(url)) started") logBodyDescription(body) dataTask.resume() - Task { [streamBridge] in - await streamBridge?.open() - } + streamBridge?.open() } catch { continuation.resume(throwing: error) } diff --git a/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift b/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift index 2f141b4b9..f59620dd0 100644 --- a/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift +++ b/Tests/ClientRuntimeTests/NetworkingTests/URLSession/FoundationStreamBridgeTests.swift @@ -71,7 +71,7 @@ class FoundationStreamBridgeTests: XCTestCase { boundStreamBufferSize: boundStreamBufferSize, logger: TestLogger(), telemetry: HttpTelemetry(httpScope: "FoundationStreamBridgeTests")) - await subject.open() + subject.open() // This will hold the data that is bridged from the ReadableStream to the Foundation InputStream var bridgedData = Data() @@ -90,7 +90,7 @@ class FoundationStreamBridgeTests: XCTestCase { } } // Once the subject is exhausted, all data should have been bridged and the subject may be closed - await subject.close() + subject.close() // Close the inputStream as well subject.inputStream.close()