Skip to content

Commit 14584a9

Browse files
committed
Move stream creation and configure to platform specific files
1 parent 7324ac4 commit 14584a9

8 files changed

+173
-89
lines changed

Sources/Subprocess/AsyncBufferSequence.swift

+13-53
Original file line numberDiff line numberDiff line change
@@ -33,15 +33,15 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
3333
private var buffer: [UInt8]
3434
private var currentPosition: Int
3535
private var finished: Bool
36-
private var streamIterator: AsyncThrowingStream<StreamStatus, Swift.Error>.AsyncIterator
36+
private var streamIterator: AsyncThrowingStream<TrackedPlatformDiskIO.StreamStatus, Swift.Error>.AsyncIterator
3737

3838
internal init(diskIO: TrackedPlatformDiskIO, bufferSize: Int) {
3939
self.diskIO = diskIO
4040
self.bufferSize = bufferSize
4141
self.buffer = []
4242
self.currentPosition = 0
4343
self.finished = false
44-
self.streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
44+
self.streamIterator = diskIO.readDataStream(upToLength: bufferSize).makeAsyncIterator()
4545
}
4646

4747
public mutating func next() async throws -> SequenceOutput.Buffer? {
@@ -51,66 +51,18 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
5151
return data
5252

5353
case .endOfStream(let data):
54-
streamIterator = Self.createDataStream(with: diskIO.dispatchIO, bufferSize: bufferSize).makeAsyncIterator()
54+
streamIterator = diskIO.readDataStream(upToLength: bufferSize).makeAsyncIterator()
5555
return data
5656

5757
case .endOfFile:
58-
try self.diskIO.safelyClose()
58+
// try self.diskIO.safelyClose()
5959
return nil
6060
}
6161
} else {
62-
try self.diskIO.safelyClose()
62+
// try self.diskIO.safelyClose()
6363
return nil
6464
}
6565
}
66-
67-
private enum StreamStatus {
68-
case data(SequenceOutput.Buffer)
69-
case endOfStream(SequenceOutput.Buffer)
70-
case endOfFile
71-
}
72-
73-
private static func createDataStream(with dispatchIO: DispatchIO, bufferSize: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
74-
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
75-
dispatchIO.read(
76-
offset: 0,
77-
length: bufferSize,
78-
queue: .global()
79-
) { done, data, error in
80-
if error != 0 {
81-
continuation.finish(throwing: SubprocessError(
82-
code: .init(.failedToReadFromSubprocess),
83-
underlyingError: .init(rawValue: error)
84-
))
85-
return
86-
}
87-
88-
// Treat empty data and nil as the same
89-
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
90-
let status: StreamStatus
91-
92-
switch (buffer, done) {
93-
case (.some(let data), false):
94-
status = .data(SequenceOutput.Buffer(data: data))
95-
96-
case (.some(let data), true):
97-
status = .endOfStream(SequenceOutput.Buffer(data: data))
98-
99-
case (nil, false):
100-
return
101-
102-
case (nil, true):
103-
status = .endOfFile
104-
}
105-
106-
continuation.yield(status)
107-
108-
if done {
109-
continuation.finish()
110-
}
111-
}
112-
}
113-
}
11466
}
11567

11668
private let diskIO: TrackedPlatformDiskIO
@@ -126,6 +78,14 @@ public struct AsyncBufferSequence: AsyncSequence, Sendable {
12678
}
12779
}
12880

81+
extension TrackedPlatformDiskIO {
82+
internal enum StreamStatus {
83+
case data(SequenceOutput.Buffer)
84+
case endOfStream(SequenceOutput.Buffer)
85+
case endOfFile
86+
}
87+
}
88+
12989
// MARK: - Page Size
13090
import _SubprocessCShims
13191

Sources/Subprocess/Execution.swift

+4-18
Original file line numberDiff line numberDiff line change
@@ -110,15 +110,8 @@ extension Execution where Output == SequenceOutput {
110110
fatalError("The standard output has already been consumed")
111111
}
112112

113-
if let lowWater = output.lowWater {
114-
readFd.dispatchIO.setLimit(lowWater: lowWater)
115-
}
116-
117-
if let highWater = output.highWater {
118-
readFd.dispatchIO.setLimit(highWater: highWater)
119-
}
120-
121-
return AsyncBufferSequence(diskIO: readFd, bufferSize: output.bufferSize)
113+
// TODO: Make buffer size and option
114+
return AsyncBufferSequence(diskIO: readFd, bufferSize: readBufferSize)
122115
}
123116
}
124117

@@ -142,15 +135,8 @@ extension Execution where Error == SequenceOutput {
142135
fatalError("The standard output has already been consumed")
143136
}
144137

145-
if let lowWater = error.lowWater {
146-
readFd.dispatchIO.setLimit(lowWater: lowWater)
147-
}
148-
149-
if let highWater = error.highWater {
150-
readFd.dispatchIO.setLimit(highWater: highWater)
151-
}
152-
153-
return AsyncBufferSequence(diskIO: readFd, bufferSize: error.bufferSize)
138+
// TODO: Make buffer size and option
139+
return AsyncBufferSequence(diskIO: readFd, bufferSize: readBufferSize)
154140
}
155141
}
156142

Sources/Subprocess/IO/Output.swift

+2-12
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,8 @@ public struct BytesOutput: OutputProtocol {
210210
#endif
211211
public struct SequenceOutput: OutputProtocol {
212212
public typealias OutputType = Void
213-
internal let lowWater: Int?
214-
internal let highWater: Int?
215-
internal let bufferSize: Int
216-
217-
internal init(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int = readBufferSize) {
218-
self.lowWater = lowWater
219-
self.highWater = highWater
220-
self.bufferSize = bufferSize
221-
}
213+
214+
internal init() {}
222215
}
223216

224217
#if SubprocessSpan
@@ -291,9 +284,6 @@ extension OutputProtocol where Self == SequenceOutput {
291284
/// to the `.standardOutput` (or `.standardError`) property
292285
/// of `Execution` as `AsyncSequence<Data>`.
293286
public static var sequence: Self { .init() }
294-
public static func sequence(lowWater: Int? = nil, highWater: Int? = nil, bufferSize: Int? = nil) -> Self {
295-
.init(lowWater: lowWater, highWater: highWater, bufferSize: bufferSize ?? readBufferSize)
296-
}
297287
}
298288

299289
// MARK: - Span Default Implementations

Sources/Subprocess/Platforms/Subprocess+Darwin.swift

+16-2
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,8 @@ public struct PlatformOptions: Sendable {
5757
/// Creates a session and sets the process group ID
5858
/// i.e. Detach from the terminal.
5959
public var createSession: Bool = false
60+
public var outputOptions: StreamOptions = .init()
61+
public var errorOptions: StreamOptions = .init()
6062
/// An ordered list of steps in order to tear down the child
6163
/// process in case the parent task is cancelled before
6264
/// the child proces terminates.
@@ -126,6 +128,18 @@ extension PlatformOptions {
126128
#endif
127129
}
128130

131+
extension PlatformOptions {
132+
public struct StreamOptions: Sendable {
133+
let lowWater: Int?
134+
let highWater: Int?
135+
136+
init(lowWater: Int? = nil, highWater: Int? = nil) {
137+
self.lowWater = lowWater
138+
self.highWater = highWater
139+
}
140+
}
141+
}
142+
129143
extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
130144
internal func description(withIndent indent: Int) -> String {
131145
let indent = String(repeating: " ", count: indent * 4)
@@ -355,8 +369,8 @@ extension Configuration {
355369
output: output,
356370
error: error,
357371
inputPipe: inputPipe.createInputPipe(),
358-
outputPipe: outputPipe.createOutputPipe(),
359-
errorPipe: errorPipe.createOutputPipe()
372+
outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions),
373+
errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions)
360374
)
361375
}
362376

Sources/Subprocess/Platforms/Subprocess+Linux.swift

+16-2
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,8 @@ extension Configuration {
127127
output: output,
128128
error: error,
129129
inputPipe: inputPipe.createInputPipe(),
130-
outputPipe: outputPipe.createOutputPipe(),
131-
errorPipe: errorPipe.createOutputPipe()
130+
outputPipe: outputPipe.createOutputPipe(with: platformOptions.outputOptions),
131+
errorPipe: errorPipe.createOutputPipe(with: platformOptions.errorOptions)
132132
)
133133
}
134134

@@ -176,6 +176,8 @@ public struct PlatformOptions: Sendable {
176176
// Creates a session and sets the process group ID
177177
// i.e. Detach from the terminal.
178178
public var createSession: Bool = false
179+
public var outputOptions: StreamOptions = .init()
180+
public var errorOptions: StreamOptions = .init()
179181
/// An ordered list of steps in order to tear down the child
180182
/// process in case the parent task is cancelled before
181183
/// the child proces terminates.
@@ -197,6 +199,18 @@ public struct PlatformOptions: Sendable {
197199
public init() {}
198200
}
199201

202+
extension PlatformOptions {
203+
public struct StreamOptions: Sendable {
204+
let lowWater: Int?
205+
let highWater: Int?
206+
207+
init(lowWater: Int? = nil, highWater: Int? = nil) {
208+
self.lowWater = lowWater
209+
self.highWater = highWater
210+
}
211+
}
212+
}
213+
200214
extension PlatformOptions: CustomStringConvertible, CustomDebugStringConvertible {
201215
internal func description(withIndent indent: Int) -> String {
202216
let indent = String(repeating: " ", count: indent * 4)

Sources/Subprocess/Platforms/Subprocess+Unix.swift

+53-1
Original file line numberDiff line numberDiff line change
@@ -412,6 +412,7 @@ extension CreatedPipe {
412412
}
413413
}
414414
)
415+
415416
writeEnd = .init(
416417
dispatchIO,
417418
closeWhenDone: writeFileDescriptor.closeWhenDone
@@ -423,7 +424,7 @@ extension CreatedPipe {
423424
)
424425
}
425426

426-
internal func createOutputPipe() -> OutputPipe {
427+
internal func createOutputPipe(with options: PlatformOptions.StreamOptions) -> OutputPipe {
427428
var readEnd: TrackedPlatformDiskIO? = nil
428429
if let readFileDescriptor = self.readFileDescriptor {
429430
let dispatchIO: DispatchIO = DispatchIO(
@@ -437,6 +438,15 @@ extension CreatedPipe {
437438
}
438439
}
439440
)
441+
442+
if let lowWater = options.lowWater {
443+
dispatchIO.setLimit(lowWater: lowWater)
444+
}
445+
446+
if let highWater = options.highWater {
447+
dispatchIO.setLimit(highWater: highWater)
448+
}
449+
440450
readEnd = .init(
441451
dispatchIO,
442452
closeWhenDone: readFileDescriptor.closeWhenDone
@@ -451,6 +461,48 @@ extension CreatedPipe {
451461

452462
// MARK: - TrackedDispatchIO extensions
453463
extension TrackedDispatchIO {
464+
internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
465+
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
466+
self.dispatchIO.read(
467+
offset: 0,
468+
length: maxLength,
469+
queue: .global()
470+
) { done, data, error in
471+
if error != 0 {
472+
continuation.finish(throwing: SubprocessError(
473+
code: .init(.failedToReadFromSubprocess),
474+
underlyingError: .init(rawValue: error)
475+
))
476+
return
477+
}
478+
479+
// Treat empty data and nil as the same
480+
let buffer = data.map { $0.isEmpty ? nil : $0 } ?? nil
481+
let status: StreamStatus
482+
483+
switch (buffer, done) {
484+
case (.some(let data), false):
485+
status = .data(SequenceOutput.Buffer(data: data))
486+
487+
case (.some(let data), true):
488+
status = .endOfStream(SequenceOutput.Buffer(data: data))
489+
490+
case (nil, false):
491+
return
492+
493+
case (nil, true):
494+
status = .endOfFile
495+
}
496+
497+
continuation.yield(status)
498+
499+
if done {
500+
continuation.finish()
501+
}
502+
}
503+
}
504+
}
505+
454506
#if SubprocessSpan
455507
@available(SubprocessSpan, *)
456508
#endif

Sources/Subprocess/Platforms/Subprocess+Windows.swift

+64
Original file line numberDiff line numberDiff line change
@@ -1043,6 +1043,70 @@ extension CreatedPipe {
10431043
}
10441044

10451045
extension TrackedFileDescriptor {
1046+
internal func readDataStream(upToLength maxLength: Int) -> AsyncThrowingStream<StreamStatus, Swift.Error> {
1047+
return AsyncThrowingStream<StreamStatus, Swift.Error> { continuation in
1048+
do {
1049+
var totalBytesRead: Int = 0
1050+
1051+
while totalBytesRead < maxLength {
1052+
let values = try [UInt8](
1053+
unsafeUninitializedCapacity: maxLength
1054+
) { buffer, initializedCount in
1055+
guard let baseAddress = buffer.baseAddress else {
1056+
initializedCount = 0
1057+
return
1058+
}
1059+
1060+
var bytesRead: DWORD = 0
1061+
let readSucceed = ReadFile(
1062+
self.fileDescriptor.platformDescriptor,
1063+
UnsafeMutableRawPointer(mutating: baseAddress),
1064+
DWORD(maxLength - totalBytesRead),
1065+
&bytesRead,
1066+
nil
1067+
)
1068+
1069+
if !readSucceed {
1070+
// Windows throws ERROR_BROKEN_PIPE when the pipe is closed
1071+
let error = GetLastError()
1072+
if error == ERROR_BROKEN_PIPE {
1073+
// We are done reading
1074+
initializedCount = 0
1075+
} else {
1076+
initializedCount = 0
1077+
throw SubprocessError(
1078+
code: .init(.failedToReadFromSubprocess),
1079+
underlyingError: .init(rawValue: error)
1080+
)
1081+
}
1082+
} else {
1083+
// We successfully read the current round
1084+
initializedCount += Int(bytesRead)
1085+
}
1086+
}
1087+
1088+
if values.count > 0 {
1089+
totalBytesRead += values.count
1090+
1091+
if totalBytesRead >= maxLength {
1092+
continuation.yield(.endOfStream(SequenceOutput.Buffer(data: values)))
1093+
continuation.finish()
1094+
return
1095+
} else {
1096+
continuation.yield(.data(SequenceOutput.Buffer(data: values)))
1097+
}
1098+
} else {
1099+
continuation.yield(.endOfFile)
1100+
continuation.finish()
1101+
return
1102+
}
1103+
}
1104+
} catch {
1105+
continuation.finish(throwing: error)
1106+
}
1107+
}
1108+
}
1109+
10461110
internal func readChunk(upToLength maxLength: Int) async throws -> SequenceOutput.Buffer? {
10471111
return try await withCheckedThrowingContinuation { continuation in
10481112
self.readUntilEOF(

0 commit comments

Comments
 (0)