Skip to content

Simplify implementation of Dispatch-async bridge #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 18 additions & 94 deletions Sources/SWBUtil/Dispatch+Async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,114 +57,38 @@ extension DispatchFD {
}
}
}
}

extension AsyncThrowingStream where Element == UInt8, Failure == any Error {
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
@available(macOS, deprecated: 15.0, message: "Use the AsyncSequence-returning overload.")
@available(iOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
@available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
@available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.")
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
public static func _dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> AsyncThrowingStream<Element, any Error> {
AsyncThrowingStream { continuation in
let newFD: DispatchFD
do {
newFD = try fileDescriptor._duplicate()
} catch {
continuation.finish(throwing: error)
return
}

let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in
do {
try newFD._close()
if error != 0 {
continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#1"))
}
} catch {
continuation.finish(throwing: error)
}
}
io.setLimit(lowWater: 0)
io.setLimit(highWater: 4096)

continuation.onTermination = { termination in
if case .cancelled = termination {
io.close(flags: .stop)
} else {
io.close()
}
}

io.read(offset: 0, length: .max, queue: queue) { done, data, error in
guard error == 0 else {
continuation.finish(throwing: POSIXError(error, context: "dataStream(reading: \(fileDescriptor))#2"))
return
}

let data = data ?? .empty
for element in data {
continuation.yield(element)
}

if done {
continuation.finish()
public func _dataStream() -> AsyncThrowingStream<SWBDispatchData, any Error> {
AsyncThrowingStream<SWBDispatchData, any Error> {
while !Task.isCancelled {
let chunk = try await readChunk(upToLength: 4096)
if chunk.isEmpty {
return nil
}
return chunk
}
throw CancellationError()
}
}
}

@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
extension AsyncSequence where Element == UInt8, Failure == any Error {
/// Returns an async stream which reads bytes from the specified file descriptor. Unlike `FileHandle.bytes`, it does not block the caller.
public static func dataStream(reading fileDescriptor: DispatchFD, on queue: SWBQueue) -> any AsyncSequence<Element, any Error> {
AsyncThrowingStream<SWBDispatchData, any Error> { continuation in
let newFD: DispatchFD
do {
newFD = try fileDescriptor._duplicate()
} catch {
continuation.finish(throwing: error)
return
}

let io = SWBDispatchIO.stream(fileDescriptor: newFD, queue: queue) { error in
do {
try newFD._close()
if error != 0 {
let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#1"
continuation.finish(throwing: POSIXError(error, context: context))
}
} catch {
continuation.finish(throwing: error)
}
}
io.setLimit(lowWater: 0)
io.setLimit(highWater: 4096)

continuation.onTermination = { termination in
if case .cancelled = termination {
io.close(flags: .stop)
} else {
io.close()
}
}

io.read(offset: 0, length: .max, queue: queue) { done, data, error in
guard error == 0 else {
let context = "dataStream(reading: \(fileDescriptor) \"\(Result { try fileDescriptor._filePath() })\")#2"
continuation.finish(throwing: POSIXError(error, context: context))
return
}

let data = data ?? .empty
continuation.yield(data)

if done {
continuation.finish()
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
public func dataStream() -> some AsyncSequence<SWBDispatchData, any Error> {
AsyncThrowingStream<SWBDispatchData, any Error> {
while !Task.isCancelled {
let chunk = try await readChunk(upToLength: 4096)
if chunk.isEmpty {
return nil
}
return chunk
}
}.flattened
throw CancellationError()
}
}
}
8 changes: 4 additions & 4 deletions Sources/SWBUtil/FileHandle+Async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ extension FileHandle {
@available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
@available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.")
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
public func _bytes(on queue: SWBQueue) -> AsyncThrowingStream<UInt8, any Error> {
._dataStream(reading: DispatchFD(fileHandle: self), on: queue)
public func _bytes() -> AsyncThrowingStream<SWBDispatchData, any Error> {
DispatchFD(fileHandle: self)._dataStream()
}

/// Replacement for `bytes` which uses DispatchIO to avoid blocking the caller.
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
public func bytes(on queue: SWBQueue) -> any AsyncSequence<UInt8, any Error> {
AsyncThrowingStream.dataStream(reading: DispatchFD(fileHandle: self), on: queue)
public func bytes() -> some AsyncSequence<SWBDispatchData, any Error> {
DispatchFD(fileHandle: self).dataStream()
}
}
12 changes: 12 additions & 0 deletions Sources/SWBUtil/Misc+Async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ extension AsyncSequence {
}
}

extension AsyncSequence where Element: RandomAccessCollection {
@inlinable
public func collect() async rethrows -> [Element.Element] {
var items = [Element.Element]()
var it = makeAsyncIterator()
while let e = try await it.next() {
items.append(contentsOf: e)
}
return items
}
}

extension TaskGroup where Element == Void {
/// Concurrency-friendly replacement for `DispatchQueue.concurrentPerform(iterations:execute:)`.
public static func concurrentPerform(iterations: Int, maximumParallelism: Int, execute work: @Sendable @escaping (Int) async -> Element) async {
Expand Down
8 changes: 4 additions & 4 deletions Sources/SWBUtil/Process+Async.swift
Original file line number Diff line number Diff line change
Expand Up @@ -35,20 +35,20 @@ extension Process {
@available(tvOS, deprecated: 18.0, message: "Use the AsyncSequence-returning overload.")
@available(watchOS, deprecated: 11.0, message: "Use the AsyncSequence-returning overload.")
@available(visionOS, deprecated: 2.0, message: "Use the AsyncSequence-returning overload.")
public func _makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> AsyncThrowingStream<UInt8, any Error> {
public func _makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> AsyncThrowingStream<SWBDispatchData, any Error> {
precondition(!isRunning) // the pipe setters will raise `NSInvalidArgumentException` anyways
self[keyPath: keyPath] = pipe
return pipe.fileHandleForReading._bytes(on: .global())
return pipe.fileHandleForReading._bytes()
}

/// Returns an ``AsyncStream`` configured to read the standard output or error stream of the process.
///
/// - note: This method will mutate the `standardOutput` or `standardError` property of the Process object, replacing any existing `Pipe` or `FileHandle` which may be set. It must be called before the process is started.
@available(macOS 15.0, iOS 18.0, tvOS 18.0, watchOS 11.0, visionOS 2.0, *)
public func makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> any AsyncSequence<UInt8, any Error> {
public func makeStream(for keyPath: ReferenceWritableKeyPath<Process, Pipe?>, using pipe: Pipe) -> some AsyncSequence<SWBDispatchData, any Error> {
precondition(!isRunning) // the pipe setters will raise `NSInvalidArgumentException` anyways
self[keyPath: keyPath] = pipe
return pipe.fileHandleForReading.bytes(on: .global())
return pipe.fileHandleForReading.bytes()
}
}

Expand Down
4 changes: 2 additions & 2 deletions Sources/SWBUtil/Process.swift
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ extension Process {
let (exitStatus, output) = try await _getOutput(url: url, arguments: arguments, currentDirectoryURL: currentDirectoryURL, environment: environment, interruptible: interruptible) { process in
process.standardOutputPipe = pipe
process.standardErrorPipe = pipe
return pipe.fileHandleForReading.bytes(on: .global())
return pipe.fileHandleForReading.bytes()
} collect: { stream in
try await stream.collect()
}
Expand All @@ -115,7 +115,7 @@ extension Process {
let (exitStatus, output) = try await _getOutput(url: url, arguments: arguments, currentDirectoryURL: currentDirectoryURL, environment: environment, interruptible: interruptible) { process in
process.standardOutputPipe = pipe
process.standardErrorPipe = pipe
return pipe.fileHandleForReading._bytes(on: .global())
return pipe.fileHandleForReading._bytes()
} collect: { stream in
try await stream.collect()
}
Expand Down
27 changes: 0 additions & 27 deletions Sources/SWBUtil/SWBDispatch.swift
Original file line number Diff line number Diff line change
Expand Up @@ -49,33 +49,6 @@ public struct DispatchFD {
rawValue = fileHandle.fileDescriptor
#endif
}

internal func _duplicate() throws -> DispatchFD {
#if os(Windows)
return self
#else
return try DispatchFD(fileDescriptor: FileDescriptor(rawValue: rawValue).duplicate())
#endif
}

internal func _close() throws {
#if !os(Windows)
try FileDescriptor(rawValue: rawValue).close()
#endif
}

// Only exists to help debug a rare concurrency issue where the file descriptor goes invalid
internal func _filePath() throws -> String {
#if canImport(Darwin)
var buffer = [CChar](repeating: 0, count: Int(MAXPATHLEN))
if fcntl(rawValue, F_GETPATH, &buffer) == -1 {
throw POSIXError(errno, "fcntl", String(rawValue), "F_GETPATH")
}
return String(cString: buffer)
#else
return String()
#endif
}
}

// @unchecked: rdar://130051790 (DispatchData should be Sendable)
Expand Down
32 changes: 16 additions & 16 deletions Tests/SWBUtilTests/FileHandleTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,21 @@ import SystemPackage
let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
try await fd.closeAfter {
if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) {
var it = fh.bytes(on: .global()).makeAsyncIterator()
var it = fh.bytes().makeAsyncIterator()
var bytesOfFile: [UInt8] = []
await #expect(throws: Never.self) {
while let byte = try await it.next() {
bytesOfFile.append(byte)
while let chunk = try await it.next() {
bytesOfFile.append(contentsOf: chunk)
}
}
#expect(bytesOfFile.count == 1448)
#expect(plist.bytes == bytesOfFile)
} else {
var it = fh._bytes(on: .global()).makeAsyncIterator()
var it = fh._bytes().makeAsyncIterator()
var bytesOfFile: [UInt8] = []
await #expect(throws: Never.self) {
while let byte = try await it.next() {
bytesOfFile.append(byte)
while let chunk = try await it.next() {
bytesOfFile.append(contentsOf: chunk)
}
}
#expect(bytesOfFile.count == 1448)
Expand All @@ -72,15 +72,15 @@ import SystemPackage
let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)

if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) {
var it = fh.bytes(on: .global()).makeAsyncIterator()
var it = fh.bytes().makeAsyncIterator()
try fd.close()

await #expect(throws: (any Error).self) {
while let _ = try await it.next() {
}
}
} else {
var it = fh._bytes(on: .global()).makeAsyncIterator()
var it = fh._bytes().makeAsyncIterator()
try fd.close()

await #expect(throws: (any Error).self) {
Expand All @@ -99,21 +99,21 @@ import SystemPackage
try await fd.closeAfter {
let fh = FileHandle(fileDescriptor: fd.rawValue, closeOnDealloc: false)
if #available(macOS 15, iOS 18, tvOS 18, watchOS 11, visionOS 2, *) {
var it = fh.bytes(on: .global()).makeAsyncIterator()
var it = fh.bytes().makeAsyncIterator()
var bytes: [UInt8] = []
while let byte = try await it.next() {
bytes.append(byte)
if bytes.count == 100 {
while let chunk = try await it.next() {
bytes.append(contentsOf: chunk)
if bytes.count >= 100 {
condition.signal()
throw CancellationError()
}
}
} else {
var it = fh._bytes(on: .global()).makeAsyncIterator()
var it = fh._bytes().makeAsyncIterator()
var bytes: [UInt8] = []
while let byte = try await it.next() {
bytes.append(byte)
if bytes.count == 100 {
while let chunk = try await it.next() {
bytes.append(contentsOf: chunk)
if bytes.count >= 100 {
condition.signal()
throw CancellationError()
}
Expand Down
14 changes: 10 additions & 4 deletions Tests/SwiftBuildTests/ConsoleCommands/CLIConnection.swift
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ final class CLIConnection {
private let monitorHandle: FileHandle
private let temporaryDirectory: NamedTemporaryDirectory
private let exitPromise: Promise<Processes.ExitStatus, any Error>
private let outputStream: AsyncThrowingStream<UInt8, any Error>
private var outputStreamIterator: AsyncCLIConnectionResponseSequence<AsyncThrowingStream<UInt8, any Error>>.AsyncIterator
private let outputStream: AsyncThrowingStream<SWBDispatchData, any Error>
private var outputStreamIterator: AsyncCLIConnectionResponseSequence<AsyncFlatteningSequence<AsyncThrowingStream<SWBDispatchData, any Error>>>.AsyncIterator

static var swiftbuildToolSearchPaths: [URL] {
var searchPaths: [URL] = []
Expand Down Expand Up @@ -138,8 +138,8 @@ final class CLIConnection {
// Close the session handle, so the FD will close once the service stops.
try sessionHandle.close()

outputStream = monitorHandle._bytes(on: .global())
outputStreamIterator = outputStream.cliResponses.makeAsyncIterator()
outputStream = monitorHandle._bytes()
outputStreamIterator = outputStream.flattened.cliResponses.makeAsyncIterator()
#endif
}

Expand Down Expand Up @@ -253,6 +253,9 @@ public struct AsyncCLIConnectionResponseSequence<Base: AsyncSequence>: AsyncSequ
// BSDs send EOF, Linux raises EIO...
#if os(Linux) || os(Android)
if error.code == EIO {
if reply.isEmpty {
return nil
}
break
}
#endif
Expand Down Expand Up @@ -282,6 +285,9 @@ public struct AsyncCLIConnectionResponseSequence<Base: AsyncSequence>: AsyncSequ
// BSDs send EOF, Linux raises EIO...
#if os(Linux) || os(Android)
if error.code == EIO {
if reply.isEmpty {
return nil
}
break
}
#endif
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ fileprivate struct ServiceConsoleTests {
let standardOutput = task._makeStream(for: \.standardOutputPipe, using: outputPipe)
let promise: Promise<Processes.ExitStatus, any Error> = try task.launch()

let data = try await standardOutput.reduce(into: [], { $0.append($1) })
let data = try await standardOutput.reduce(into: [], { $0.append(contentsOf: $1) })
let output = String(decoding: data, as: UTF8.self)

// Verify there were no errors.
Expand Down
Loading