Skip to content

Commit

Permalink
Merge pull request #1022 from GarthSnyder/dispatcher-cleanup
Browse files Browse the repository at this point in the history
Dispatchers cleanup
  • Loading branch information
mxcl authored Mar 19, 2019
2 parents f785e0e + 4c7b721 commit 97bb3c0
Show file tree
Hide file tree
Showing 9 changed files with 74 additions and 62 deletions.
4 changes: 2 additions & 2 deletions Sources/Dispatcher.swift
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public struct DispatchQueueDispatcher: Dispatcher {
let qos: DispatchQoS?
let flags: DispatchWorkItemFlags?

init(queue: DispatchQueue, group: DispatchGroup? = nil, qos: DispatchQoS? = nil, flags: DispatchWorkItemFlags? = nil) {
public init(queue: DispatchQueue, group: DispatchGroup? = nil, qos: DispatchQoS? = nil, flags: DispatchWorkItemFlags? = nil) {
self.queue = queue
self.group = group
self.qos = qos
Expand All @@ -40,7 +40,7 @@ public struct DispatchQueueDispatcher: Dispatcher {
}

// Avoid having to hard-code any particular defaults for qos or flags
public extension DispatchQueue {
internal extension DispatchQueue {
final func asyncD(group: DispatchGroup? = nil, qos: DispatchQoS? = nil, flags: DispatchWorkItemFlags? = nil, execute body: @escaping () -> Void) {
switch (qos, flags) {
case (nil, nil):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ import Foundation
/// A PromiseKit Dispatcher that allows no more than X simultaneous
/// executions at once.

class ConcurrencyLimitedDispatcher: Dispatcher {
public class ConcurrencyLimitedDispatcher: Dispatcher {

let queue: Dispatcher
let serializer: DispatchQueue = DispatchQueue(label: "CLD serializer")

private let semaphore: DispatchSemaphore
let semaphore: DispatchSemaphore

/// A `PromiseKit` `Dispatcher` that allows no more than X simultaneous
/// executions at once.
Expand All @@ -23,6 +23,10 @@ class ConcurrencyLimitedDispatcher: Dispatcher {
semaphore = DispatchSemaphore(value: limit)
}

public convenience init(limit: Int, queue: DispatchQueue) {
self.init(limit: limit, queue: queue as Dispatcher)
}

public func dispatch(_ body: @escaping () -> Void) {
serializer.async {
self.semaphore.wait()
Expand Down
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,16 @@ public class RateLimitedDispatcher: RateLimitedDispatcherBase {
/// - perInterval: The length of the reference interval, in seconds.
/// - queue: The DispatchQueue or Dispatcher on which to perform executions. May be serial or concurrent.

override init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
override public init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
latestAccrual = DispatchTime.now()
super.init(maxDispatches: maxDispatches, perInterval: interval, queue: queue)
tokensInBucket = Double(maxDispatches)
}

public convenience init(maxDispatches: Int, perInterval interval: TimeInterval, queue: DispatchQueue) {
self.init(maxDispatches: maxDispatches, perInterval: interval, queue: queue as Dispatcher)
}

override func dispatchFromQueue() {

guard undispatched.count > 0 else { return }
Expand Down Expand Up @@ -97,7 +101,7 @@ public class RateLimitedDispatcher: RateLimitedDispatcherBase {

}

internal extension DispatchTime {
extension DispatchTime {
static func -(a: DispatchTime, b: DispatchTime) -> TimeInterval {
let delta = a.uptimeNanoseconds - b.uptimeNanoseconds
return TimeInterval(delta) / 1_000_000_000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ public class RateLimitedDispatcherBase: Dispatcher {
let interval: TimeInterval
let queue: Dispatcher

internal let serializer = DispatchQueue(label: "RLD serializer")
let serializer = DispatchQueue(label: "RLD serializer")

internal var nDispatched = 0
internal var undispatched = Queue<() -> Void>()
var nDispatched = 0
var undispatched = Queue<() -> Void>()

internal var cleanupNonce: Int64 = 0
internal var cleanupWorkItem: DispatchWorkItem? { willSet { cleanupWorkItem?.cancel() }}
var cleanupNonce: Int64 = 0
var cleanupWorkItem: DispatchWorkItem? { willSet { cleanupWorkItem?.cancel() }}

public init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
self.maxDispatches = maxDispatches
Expand All @@ -27,26 +27,26 @@ public class RateLimitedDispatcherBase: Dispatcher {
}
}

internal func dispatchFromQueue() {
func dispatchFromQueue() {
fatalError("Subclass responsibility")
}

internal func recordActualStart() {
func recordActualStart() {
nDispatched -= 1
dispatchFromQueue()
if nDispatched == 0 && undispatched.isEmpty {
scheduleCleanup()
}
}

internal func scheduleCleanup() {
func scheduleCleanup() {
cleanupWorkItem = DispatchWorkItem { [ weak self, nonce = self.cleanupNonce ] in
self?.cleanup(nonce)
}
serializer.asyncAfter(deadline: DispatchTime.now() + interval, execute: cleanupWorkItem!)
}

internal func cleanup(_ nonce: Int64) {
func cleanup(_ nonce: Int64) {
// Calls to cleanup() have to go through the serializer queue, so by by the time
// we get here, more activity may have occurred. Ergo, verify nonce.
guard nonce == cleanupNonce else { return }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ public class StrictRateLimitedDispatcher: RateLimitedDispatcherBase {
/// - perInterval: The length of the reference interval, in seconds.
/// - queue: The DispatchQueue or Dispatcher on which to perform executions. May be serial or concurrent.

override init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
override public init(maxDispatches: Int, perInterval interval: TimeInterval, queue: Dispatcher = DispatchQueue.global()) {
startTimeHistory = Queue<DispatchTime>(maxDepth: maxDispatches)
immediateDispatchesAvailable = maxDispatches
super.init(maxDispatches: maxDispatches, perInterval: interval, queue: queue)
}

public convenience init(maxDispatches: Int, perInterval interval: TimeInterval, queue: DispatchQueue) {
self.init(maxDispatches: maxDispatches, perInterval: interval, queue: queue as Dispatcher)
}

override func dispatchFromQueue() {

cleanupNonce += 1
Expand Down
47 changes: 31 additions & 16 deletions Tests/Core/DispatcherTypeTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,15 @@ import Dispatch
@testable import PromiseKit
import XCTest

class DispatcherTestBase: XCTestCase {
class DispatcherTypeTests: XCTestCase {

struct ScenarioParameters {
let hiatusLikelihoods: [Double]
let noDelayLikelihoods: [Double]
let intervals: [Double]
let dispatches: [Int]
}

let standardParams = ScenarioParameters(
hiatusLikelihoods: [ 0.3 ],
noDelayLikelihoods: [ 0.75 ],
Expand Down Expand Up @@ -91,7 +91,7 @@ class DispatcherTestBase: XCTestCase {
}
}

let nHiatuses = delays.count { $0 > avgSlice }
let nHiatuses = delays.lazy.filter { $0 > avgSlice }.count

scenarios.append(RateLimitScenario(maxDispatches: maxDispatches, interval: interval,
hiatusLikelihood: hiatusLikelihoodPerInterval, nHiatuses: nHiatuses,
Expand Down Expand Up @@ -144,10 +144,6 @@ class DispatcherTestBase: XCTestCase {
}
return most
}

}

class RateLimitTests: DispatcherTestBase {

func testRateLimitedDispatcher() {
for scenario in scenarios {
Expand All @@ -166,10 +162,6 @@ class RateLimitTests: DispatcherTestBase {
}
}

}

class StrictRateLimitTests: DispatcherTestBase {

func testStrictRateLimitedDispatcher() {
for scenario in scenarios {
printScenarioDetails(scenario)
Expand All @@ -190,10 +182,6 @@ class StrictRateLimitTests: DispatcherTestBase {
}
}

}

class ConcurrencyLimitTests: DispatcherTestBase {

func testConcurrencyLimitedDispatcher() {

for scenario in scenarios {
Expand Down Expand Up @@ -233,8 +221,35 @@ class ConcurrencyLimitTests: DispatcherTestBase {
}
}

}
// These aren't really "tests" per se; they just exercise all the various init types
// to verify that none of them produce ambiguity warnings or recurse indefinitely,
// and that DispatchQueue members are accessible.

func testRateLimitedDispatcherInit() {
XCTAssertNotNil(RateLimitedDispatcher(maxDispatches: 1, perInterval: 1))
XCTAssertNotNil(RateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: DispatchQueue.main))
XCTAssertNotNil(RateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: CurrentThreadDispatcher()))
XCTAssertNotNil(RateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: .main))
XCTAssertNotNil(RateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: .global(qos: .background)))
}

func testStrictRateLimitedDispatcherInit() {
XCTAssertNotNil(StrictRateLimitedDispatcher(maxDispatches: 1, perInterval: 1))
XCTAssertNotNil(StrictRateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: DispatchQueue.main))
XCTAssertNotNil(StrictRateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: CurrentThreadDispatcher()))
XCTAssertNotNil(StrictRateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: .main))
XCTAssertNotNil(StrictRateLimitedDispatcher(maxDispatches: 1, perInterval: 1, queue: .global(qos: .background)))
}

func testConcurrencyLimitedDispatcherInit() {
XCTAssertNotNil(ConcurrencyLimitedDispatcher(limit: 1))
XCTAssertNotNil(ConcurrencyLimitedDispatcher(limit: 1, queue: DispatchQueue.main))
XCTAssertNotNil(ConcurrencyLimitedDispatcher(limit: 1, queue: CurrentThreadDispatcher()))
XCTAssertNotNil(ConcurrencyLimitedDispatcher(limit: 1, queue: .main))
XCTAssertNotNil(ConcurrencyLimitedDispatcher(limit: 1, queue: .global(qos: .background)))
}

}

// Reproducible, seedable RNG

Expand Down
45 changes: 15 additions & 30 deletions Tests/Core/XCTestManifests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -50,15 +50,6 @@ extension CatchableTests {
]
}

extension ConcurrencyLimitTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__ConcurrencyLimitTests = [
("testConcurrencyLimitedDispatcher", testConcurrencyLimitedDispatcher),
]
}

extension DispatcherTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
Expand All @@ -75,6 +66,20 @@ extension DispatcherTests {
]
}

extension DispatcherTypeTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__DispatcherTypeTests = [
("testConcurrencyLimitedDispatcher", testConcurrencyLimitedDispatcher),
("testConcurrencyLimitedDispatcherInit", testConcurrencyLimitedDispatcherInit),
("testRateLimitedDispatcher", testRateLimitedDispatcher),
("testRateLimitedDispatcherInit", testRateLimitedDispatcherInit),
("testStrictRateLimitedDispatcher", testStrictRateLimitedDispatcher),
("testStrictRateLimitedDispatcherInit", testStrictRateLimitedDispatcherInit),
]
}

extension GuaranteeTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
Expand Down Expand Up @@ -181,15 +186,6 @@ extension RaceTests {
]
}

extension RateLimitTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__RateLimitTests = [
("testRateLimitedDispatcher", testRateLimitedDispatcher),
]
}

extension RegressionTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
Expand All @@ -210,15 +206,6 @@ extension StressTests {
]
}

extension StrictRateLimitTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
// to regenerate.
static let __allTests__StrictRateLimitTests = [
("testStrictRateLimitedDispatcher", testStrictRateLimitedDispatcher),
]
}

extension ThenableTests {
// DO NOT MODIFY: This is autogenerated, use:
// `swift test --generate-linuxmain`
Expand Down Expand Up @@ -310,8 +297,8 @@ public func __allTests() -> [XCTestCaseEntry] {
testCase(AfterTests.__allTests__AfterTests),
testCase(CancellationTests.__allTests__CancellationTests),
testCase(CatchableTests.__allTests__CatchableTests),
testCase(ConcurrencyLimitTests.__allTests__ConcurrencyLimitTests),
testCase(DispatcherTests.__allTests__DispatcherTests),
testCase(DispatcherTypeTests.__allTests__DispatcherTypeTests),
testCase(GuaranteeTests.__allTests__GuaranteeTests),
testCase(HangTests.__allTests__HangTests),
testCase(JoinTests.__allTests__JoinTests),
Expand All @@ -320,10 +307,8 @@ public func __allTests() -> [XCTestCaseEntry] {
testCase(PMKErrorTests.__allTests__PMKErrorTests),
testCase(PromiseTests.__allTests__PromiseTests),
testCase(RaceTests.__allTests__RaceTests),
testCase(RateLimitTests.__allTests__RateLimitTests),
testCase(RegressionTests.__allTests__RegressionTests),
testCase(StressTests.__allTests__StressTests),
testCase(StrictRateLimitTests.__allTests__StrictRateLimitTests),
testCase(ThenableTests.__allTests__ThenableTests),
testCase(WhenConcurrentTestCase_Swift.__allTests__WhenConcurrentTestCase_Swift),
testCase(WhenTests.__allTests__WhenTests),
Expand Down

0 comments on commit 97bb3c0

Please sign in to comment.