Skip to content

Commit

Permalink
Merge pull request #93 from glessard/devel
Browse files Browse the repository at this point in the history
Improve queue handling
  • Loading branch information
glessard authored Dec 2, 2020
2 parents 8142f2f + 5846bf2 commit 43546df
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 58 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ jobs:
env: SWIFT=5.2.4

- os: osx
osx_image: xcode12.1
env: SWIFT=5.3
osx_image: xcode12.2
env: SWIFT=5.3.1

- os: linux
env: SWIFT=5.0.3
Expand All @@ -30,7 +30,7 @@ jobs:
env: SWIFT=5.2.5

- os: linux
env: SWIFT=5.3
env: SWIFT=5.3.1

before_install:
- clang --version
Expand Down
8 changes: 4 additions & 4 deletions Source/deferred/deferred-combine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public func combine<Success, Failure, S>(qos: DispatchQoS = .current,
deferreds: S) -> Deferred<[Success], Failure>
where Failure: Error, S: Sequence, S.Element == Deferred<Success, Failure>
{
let queue = DispatchQueue(label: "reduce-sequence", qos: qos)
let queue = DispatchQueue(label: "reduce-deferred", qos: qos, target: .global(qos: qos.qosClass))
return combine(queue: queue, deferreds: deferreds)
}

Expand Down Expand Up @@ -98,7 +98,7 @@ public func reduce<S, T, F, U>(qos: DispatchQoS,
combine: @escaping (_ accumulated: U, _ element: T) -> U) -> Deferred<U, F>
where S: Sequence, S.Element == Deferred<T, F>
{
let queue = DispatchQueue(label: "reduce-sequence", qos: qos)
let queue = DispatchQueue(label: "reduce-deferred", qos: qos, target: .global(qos: qos.qosClass))
return reduce(queue: queue, deferreds: deferreds, initial: initial, combine: combine)
}

Expand Down Expand Up @@ -159,8 +159,8 @@ public func reduce<S, T, F, U>(queue: DispatchQueue,
{
return Deferred<U, F>(queue: queue) {
resolver in
// We execute `Sequence.reduce` asynchronously because
// nothing prevents S from blocking on `Sequence.next()`
// We execute `Sequence.reduce` on a concurrent thread
// because nothing prevents S from blocking on `Sequence.next()`
DispatchQueue.global(qos: queue.qos.qosClass).async {
let r = deferreds.reduce(Deferred<U, F>(queue: queue, value: initial)) {
(accumulator, deferred) in
Expand Down
32 changes: 16 additions & 16 deletions Source/deferred/deferred-extras.swift
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,9 @@ extension Deferred
/// - parameter serially: whether the notifications should be dispatched on a serial queue; defaults to `true`
/// - returns: a new `Deferred` whose notifications will execute at QoS `qos`

public func enqueuing(at qos: DispatchQoS, serially: Bool = true) -> Deferred
public func enqueuing(at qos: DispatchQoS) -> Deferred
{
let queue = DispatchQueue(label: "deferred", qos: qos, attributes: serially ? [] : .concurrent)
let queue = DispatchQueue(label: "\(self.queue.label)-enqueuing", qos: qos, target: self.queue)
return enqueuing(on: queue)
}
}
Expand Down Expand Up @@ -139,7 +139,7 @@ extension Deferred
public func map<Other>(qos: DispatchQoS,
transform: @escaping (_ value: Success) -> Other) -> Deferred<Other, Failure>
{
let queue = DispatchQueue(label: "deferred-map", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-map", qos: qos, target: self.queue)
return map(queue: queue, transform: transform)
}

Expand Down Expand Up @@ -174,7 +174,7 @@ extension Deferred
public func tryMap<Other>(qos: DispatchQoS,
transform: @escaping (_ value: Success) throws -> Other) -> Deferred<Other, Error>
{
let queue = DispatchQueue(label: "deferred-trymap", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-trymap", qos: qos, target: self.queue)
return tryMap(queue: queue, transform: transform)
}

Expand Down Expand Up @@ -209,7 +209,7 @@ extension Deferred
public func mapError<OtherFailure>(qos: DispatchQoS,
transform: @escaping (_ error: Failure) -> OtherFailure) -> Deferred<Success, OtherFailure>
{
let queue = DispatchQueue(label: "deferred-maperror", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-maperror", qos: qos, target: self.queue)
return mapError(queue: queue, transform: transform)
}

Expand Down Expand Up @@ -297,7 +297,7 @@ extension Deferred
public func flatMap<Other>(qos: DispatchQoS,
transform: @escaping (_ value: Success) -> Deferred<Other, Failure>) -> Deferred<Other, Failure>
{
let queue = DispatchQueue(label: "deferred-flatmap", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-flatmap", qos: qos, target: self.queue)
return flatMap(queue: queue, transform: transform)
}

Expand Down Expand Up @@ -347,7 +347,7 @@ extension Deferred
public func tryFlatMap<Other>(qos: DispatchQoS,
transform: @escaping (_ value: Success) throws -> Deferred<Other, Error>) -> Deferred<Other, Error>
{
let queue = DispatchQueue(label: "deferred-flatmap", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-flatmap", qos: qos, target: self.queue)
return tryFlatMap(queue: queue, transform: transform)
}

Expand Down Expand Up @@ -398,7 +398,7 @@ extension Deferred
public func flatMapError<OtherFailure>(qos: DispatchQoS,
transform: @escaping(_ error: Failure) -> Deferred<Success, OtherFailure>) -> Deferred<Success, OtherFailure>
{
let queue = DispatchQueue(label: "deferred-flatmaperror", qos: qos)
let queue = DispatchQueue(label: "\(self.queue)-flatmaperror", qos: qos, target: self.queue)
return flatMapError(queue: queue, transform: transform)
}
}
Expand Down Expand Up @@ -565,7 +565,7 @@ extension Deferred
public func recover(qos: DispatchQoS,
transform: @escaping (_ error: Failure) -> Deferred) -> Deferred
{
let queue = DispatchQueue(label: "deferred-recover", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-recover", qos: qos, target: self.queue)
return recover(queue: queue, transform: transform)
}

Expand All @@ -580,7 +580,7 @@ extension Deferred
public static func Retrying(_ attempts: Int, qos: DispatchQoS = .current,
task: @escaping () -> Deferred) -> Deferred
{
let queue = DispatchQueue(label: "retrying", qos: qos)
let queue = DispatchQueue(label: "deferred-retry", qos: qos, target: .global(qos: qos.qosClass))
return Deferred.Retrying(attempts, queue: queue, task: task)
}

Expand Down Expand Up @@ -655,7 +655,7 @@ extension Deferred where Failure == Error
public func recover(qos: DispatchQoS,
transform: @escaping (_ error: Error) throws -> Success) -> Deferred
{
let queue = DispatchQueue(label: "deferred-recover", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-recover", qos: qos, target: self.queue)
return recover(queue: queue, transform: transform)
}

Expand All @@ -670,7 +670,7 @@ extension Deferred where Failure == Error
public static func Retrying(_ attempts: Int, qos: DispatchQoS = .current,
task: @escaping () throws -> Success) -> Deferred
{
let queue = DispatchQueue(label: "deferred", qos: qos)
let queue = DispatchQueue(label: "deferred-retry", qos: qos, target: .global(qos: qos.qosClass))
return Deferred.Retrying(attempts, queue: queue, task: task)
}

Expand Down Expand Up @@ -763,7 +763,7 @@ extension Deferred
public func apply<Other>(qos: DispatchQoS,
transform: Deferred<(_ value: Success) -> Other, Never>) -> Deferred<Other, Failure>
{
let queue = DispatchQueue(label: "deferred-apply", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-apply", qos: qos, target: self.queue)
return apply(queue: queue, transform: transform)
}
}
Expand Down Expand Up @@ -803,7 +803,7 @@ extension Deferred
public func validate(qos: DispatchQoS,
predicate: @escaping (_ value: Success) -> Bool, message: String = "") -> Deferred<Success, Error>
{
let queue = DispatchQueue(label: "deferred", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-validate", qos: qos, target: self.queue)
return validate(queue: queue, predicate: predicate, message: message)
}

Expand Down Expand Up @@ -838,7 +838,7 @@ extension Deferred
public func validate(qos: DispatchQoS,
predicate: @escaping (_ value: Success) throws -> Void) -> Deferred<Success, Error>
{
let queue = DispatchQueue(label: "deferred", qos: qos)
let queue = DispatchQueue(label: "\(self.queue.label)-validate", qos: qos, target: self.queue)
return validate(queue: queue, predicate: predicate)
}
}
Expand Down Expand Up @@ -870,7 +870,7 @@ extension Optional

public func deferred(qos: DispatchQoS = .current) -> Deferred<Wrapped, Invalidation>
{
let queue = DispatchQueue(label: "deferred", qos: qos)
let queue = DispatchQueue(label: "deferred", qos: qos, target: .global(qos: qos.qosClass))
return self.deferred(queue: queue)
}
}
17 changes: 10 additions & 7 deletions Source/deferred/deferred-first.swift
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public func firstValue<Success, Failure, C: Collection>(_ deferreds: C, qos: Dis
cancelOthers: Bool = false) -> Deferred<Success, Failure>?
where C.Element: Deferred<Success, Failure>
{
let queue = DispatchQueue(label: "first-collection", qos: qos)
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
return firstValue(deferreds, queue: queue, cancelOthers: cancelOthers)
}

Expand Down Expand Up @@ -144,7 +144,7 @@ public func firstValue<Success, Failure, S: Sequence>(_ deferreds: S, qos: Dispa
cancelOthers: Bool = false) -> Deferred<Success, Failure>?
where S.Element: Deferred<Success, Failure>
{
let queue = DispatchQueue(label: "first-sequence", qos: qos)
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
return firstValue(deferreds, queue: queue, cancelOthers: cancelOthers)
}

Expand Down Expand Up @@ -222,7 +222,7 @@ public func firstResolved<Success, Failure, C>(_ deferreds: C, qos: DispatchQoS
cancelOthers: Bool = false) -> Deferred<Success, Failure>?
where C: Collection, C.Element: Deferred<Success, Failure>
{
let queue = DispatchQueue(label: "first-collection", qos: qos)
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
return firstResolved(deferreds, queue: queue, cancelOthers: cancelOthers)
}

Expand Down Expand Up @@ -274,7 +274,7 @@ public func firstResolved<Success, Failure, S: Sequence>(_ deferreds: S, qos: Di
cancelOthers: Bool = false) -> Deferred<Success, Failure>?
where S.Element: Deferred<Success, Failure>
{
let queue = DispatchQueue(label: "first-sequence", qos: qos)
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
return firstResolved(deferreds, queue: queue, cancelOthers: cancelOthers)
}

Expand Down Expand Up @@ -460,7 +460,8 @@ public func firstValue<T1, F1, T2, F2>(_ d1: Deferred<T1, F1>,
-> (Deferred<T1, Error>, Deferred<T2, Error>)
{
// find which input first gets a value
let queue = DispatchQueue(label: "select", qos: .current)
let qos = DispatchQoS.current
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
let timeoutMessage = "too slow in \(#function)"
let selected = Deferred<ObjectIdentifier, Invalidation>(queue: queue) {
resolver in
Expand Down Expand Up @@ -497,7 +498,8 @@ public func firstValue<T1, F1, T2, F2, T3, F3>(_ d1: Deferred<T1, F1>,
-> (Deferred<T1, Error>, Deferred<T2, Error>, Deferred<T3, Error>)
{
// find which input first gets a value
let queue = DispatchQueue(label: "select", qos: .current)
let qos = DispatchQoS.current
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
let timeoutMessage = "too slow in \(#function)"
let selected = Deferred<ObjectIdentifier, Invalidation>() {
resolver in
Expand Down Expand Up @@ -538,7 +540,8 @@ public func firstValue<T1, F1, T2, F2, T3, F3, T4, F4>(_ d1: Deferred<T1, F1>,
-> (Deferred<T1, Error>, Deferred<T2, Error>, Deferred<T3, Error>, Deferred<T4, Error>)
{
// find which input first gets a value
let queue = DispatchQueue(label: "select", qos: .current)
let qos = DispatchQoS.current
let queue = DispatchQueue(label: "first-deferred", qos: qos, target: .global(qos: qos.qosClass))
let timeoutMessage = "too slow in \(#function)"
let selected = Deferred<ObjectIdentifier, Invalidation>() {
resolver in
Expand Down
16 changes: 12 additions & 4 deletions Source/deferred/deferred-parallelize.swift
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ extension Collection
public func deferredMap<Success>(qos: DispatchQoS = .current,
task: @escaping (_ element: Self.Element) throws -> Success) -> [Deferred<Success, Error>]
{
let queue = DispatchQueue(label: "deferred-map", qos: qos)
let queue = DispatchQueue(label: "parallel-deferred", qos: qos, attributes: .concurrent, target: .global(qos: qos.qosClass))
return deferredMap(queue: queue, task: task)
}

Expand All @@ -96,7 +96,7 @@ extension Collection
public func deferredMap<Success>(qos: DispatchQoS = .current,
task: @escaping (_ element: Self.Element) -> Success) -> [Deferred<Success, Never>]
{
let queue = DispatchQueue(label: "deferred-map", qos: qos)
let queue = DispatchQueue(label: "parallel-deferred", qos: qos, attributes: .concurrent, target: .global(qos: qos.qosClass))
return deferredMap(queue: queue, task: task)
}

Expand All @@ -111,7 +111,11 @@ extension Collection
task: @escaping (_ element: Self.Element) throws -> Success) -> [Deferred<Success, Error>]
{
let count = self.count
let pairs = (0..<count).map { _ in Deferred<Success, Error>.CreatePair(queue: queue) }
let pairs = (0..<count).map {
Deferred<Success, Error>.CreatePair(
queue: DispatchQueue(label: "\(queue.label)-\($0)", qos: queue.qos, target: queue)
)
}
let resolvers = pairs.map { $0.0 }

queue.async {
Expand Down Expand Up @@ -143,7 +147,11 @@ extension Collection
task: @escaping (_ element: Self.Element) -> Success) -> [Deferred<Success, Never>]
{
let count = self.count
let pairs = (0..<count).map { _ in Deferred<Success, Never>.CreatePair(queue: queue) }
let pairs = (0..<count).map {
Deferred<Success, Never>.CreatePair(
queue: DispatchQueue(label: "\(queue.label)-\($0)", qos: queue.qos, target: queue)
)
}
let resolvers = pairs.map { $0.0 }

queue.async {
Expand Down
6 changes: 3 additions & 3 deletions Source/deferred/deferred-timeout.swift
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ extension Deferred
}
else if deadline != .distantFuture
{
let queue = DispatchQueue(label: "timeout", qos: qos)
let queue = DispatchQueue.global(qos: qos.qosClass)
queue.asyncAfter(deadline: deadline) { [weak self] in self?.cancel(timedOut) }
}
return broadened
Expand All @@ -81,7 +81,7 @@ extension Deferred
}
else if deadline != .distantFuture
{
let queue = DispatchQueue(label: "timeout", qos: qos)
let queue = DispatchQueue.global(qos: qos.qosClass)
queue.asyncAfter(deadline: deadline) { [weak broadened] in broadened?.cancel(timedOut) }
}
return broadened
Expand Down Expand Up @@ -138,7 +138,7 @@ extension Deferred where Failure == Cancellation
}
else if deadline != .distantFuture
{
let queue = DispatchQueue(label: "timeout", qos: qos)
let queue = DispatchQueue.global(qos: qos.qosClass)
queue.asyncAfter(deadline: deadline) { [weak self] in self?.cancel(timedOut) }
}
return self
Expand Down
Loading

0 comments on commit 43546df

Please sign in to comment.