diff --git a/Sources/PenguinGraphs/AdjacencyList.swift b/Sources/PenguinGraphs/AdjacencyList.swift index e3ad5c16..020c814e 100644 --- a/Sources/PenguinGraphs/AdjacencyList.swift +++ b/Sources/PenguinGraphs/AdjacencyList.swift @@ -376,7 +376,7 @@ extension AdjacencyList: ParallelGraph { // TODO: Separate them out to be on different cache lines to avoid false sharing! // A per-thread array of global states, where each thread index gets its own. - var globalStates: [GlobalState?] = Array(repeating: nil, count: threadPool.parallelism + 1) + var globalStates: [GlobalState?] = Array(repeating: nil, count: threadPool.maxParallelism + 1) try globalStates.withUnsafeMutableBufferPointer { globalStates in try storage.withUnsafeMutableBufferPointer { vertices in diff --git a/Sources/PenguinGraphs/VertexParallelProtocols.swift b/Sources/PenguinGraphs/VertexParallelProtocols.swift index cc87124b..9379be8d 100644 --- a/Sources/PenguinGraphs/VertexParallelProtocols.swift +++ b/Sources/PenguinGraphs/VertexParallelProtocols.swift @@ -296,7 +296,7 @@ public class PerThreadMailboxes< /// /// This initializer helps the type inference algorithm along. public convenience init(for graph: __shared SequentialGraph, sending messageType: Message.Type) where SequentialGraph.ParallelProjection == Graph { - self.init(vertexCount: graph.vertexCount, threadCount: ComputeThreadPools.parallelism) + self.init(vertexCount: graph.vertexCount, threadCount: ComputeThreadPools.maxParallelism) } } diff --git a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift index 9fe54f58..9a8e7d94 100644 --- a/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift +++ b/Sources/PenguinParallel/NonblockingThreadPool/NonBlockingThreadPool.swift @@ -289,7 +289,49 @@ public class NonBlockingThreadPool: ComputeThr if let e = err { throw e } } - /// Shuts down the thread pool. + /// Executes `fn`, optionally in parallel, spanning the range `0..= end { + fn(start, end, n) + } else { + // Divide into 2 & recurse. + let rangeSize = end - start + let midPoint = start + (rangeSize / 2) + self.join({ executeParallelFor(start, midPoint) }, { executeParallelFor(midPoint, end)}) + } + } + + executeParallelFor(0, n) + } + + /// Executes `fn`, optionally in parallel, spanning the range `0..= end { + try fn(start, end, n) + } else { + // Divide into 2 & recurse. + let rangeSize = end - start + let midPoint = start + (rangeSize / 2) + try self.join({ try executeParallelFor(start, midPoint) }, { try executeParallelFor(midPoint, end) }) + } + } + + try executeParallelFor(0, n) + } + + /// Requests that all threads in the threadpool exit and cleans up their associated resources. + /// + /// This function returns only once all threads have exited and their resources have been + /// deallocated. + /// + /// Note: if a work item was submitted to the threadpool that never completes (i.e. has an + /// infinite loop), this function will never return. public func shutDown() { cancelled = true condition.notify(all: true) @@ -300,7 +342,7 @@ public class NonBlockingThreadPool: ComputeThr threads.removeAll() // Remove threads that have been shut down. } - public var parallelism: Int { totalThreadCount } + public var maxParallelism: Int { totalThreadCount } public var currentThreadIndex: Int? { perThreadKey.localValue?.threadId diff --git a/Sources/PenguinParallel/Parallel/Array+ParallelSequence.swift b/Sources/PenguinParallel/Parallel/Array+ParallelSequence.swift index cc9f7ba9..62eb6e9b 100644 --- a/Sources/PenguinParallel/Parallel/Array+ParallelSequence.swift +++ b/Sources/PenguinParallel/Parallel/Array+ParallelSequence.swift @@ -39,7 +39,7 @@ extension Array where Element: Numeric { return buffer_psum( pool, // TODO: Take defaulted-arg & thread local to allow for composition! buff, - computeRecursiveDepth(procCount: pool.parallelism) + 2) // Sub-divide into quarters-per-processor in case of uneven scheduling. + computeRecursiveDepth(procCount: pool.maxParallelism) + 2) // Sub-divide into quarters-per-processor in case of uneven scheduling. } } } diff --git a/Sources/PenguinParallel/ThreadPool.swift b/Sources/PenguinParallel/ThreadPool.swift index bc5af71f..27fae382 100644 --- a/Sources/PenguinParallel/ThreadPool.swift +++ b/Sources/PenguinParallel/ThreadPool.swift @@ -60,16 +60,56 @@ public protocol ComputeThreadPool { /// This is the throwing overload func join(_ a: () throws -> Void, _ b: () throws -> Void) throws + /// A function to be invoked in parallel a specified number of times by `parallelFor`. + /// + /// - Parameter currentInvocationIndex: the index of the invocation executing + /// in the current thread. + /// - Parameter requestedInvocationCount: the number of parallel invocations requested. + typealias ParallelForBody + = (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) -> Void + /// A function that can be executed in parallel. /// - /// The first argument is the index of the copy, and the second argument is the total number of - /// copies being executed. - typealias ParallelForFunc = (Int, Int) throws -> Void + /// - Parameter currentInvocationIndex: the index of the invocation executing + /// in the current thread. + /// - Parameter requestedInvocationCount: the number of parallel invocations requested. + typealias ThrowingParallelForBody + = (_ currentInvocationIndex: Int, _ requestedInvocationCount: Int) throws -> Void + + /// A vectorized function that can be executed in parallel. + /// + /// The first argument is the start index for the vectorized operation, and the second argument + /// corresponds to the end of the range. The third argument contains the total size of the range. + typealias VectorizedParallelForBody = (Int, Int, Int) -> Void + + /// A vectorized function that can be executed in parallel. + /// + /// The first argument is the start index for the vectorized operation, and the second argument + /// corresponds to the end of the range. The third argument contains the total size of the range. + typealias ThrowingVectorizedParallelForBody = (Int, Int, Int) throws -> Void /// Returns after executing `fn` `n` times. /// /// - Parameter n: The total times to execute `fn`. - func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows + func parallelFor(n: Int, _ fn: ParallelForBody) + + /// Returns after executing `fn` an unspecified number of times, guaranteeing that `fn` has been + /// called with parameters that perfectly cover of the range `0.. Void, _ b: () -> Void) { - withoutActuallyEscaping(a) { a in - let throwing: () throws -> Void = a - try! join(throwing, b) - } - } -} -/// Holds a parallel for function; this is used to avoid extra refcount overheads on the function -/// itself. -fileprivate struct ParallelForFunctionHolder { - var fn: ComputeThreadPool.ParallelForFunc -} - -/// Uses `ComputeThreadPool.join` to execute `fn` in parallel. -fileprivate func runParallelFor( - pool: C, - start: Int, - end: Int, - total: Int, - fn: UnsafePointer -) throws { - if start + 1 == end { - try fn.pointee.fn(start, total) - } else { - assert(end > start) - let distance = end - start - let midpoint = start + (distance / 2) - try pool.join( - { try runParallelFor(pool: pool, start: start, end: midpoint, total: total, fn: fn) }, - { try runParallelFor(pool: pool, start: midpoint, end: end, total: total, fn: fn) }) - } -} - -extension ComputeThreadPool { - public func parallelFor(n: Int, _ fn: ParallelForFunc) rethrows { - try withoutActuallyEscaping(fn) { fn in - var holder = ParallelForFunctionHolder(fn: fn) - try withUnsafePointer(to: &holder) { holder in - try runParallelFor(pool: self, start: 0, end: n, total: n, fn: holder) + /// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized). + public func parallelFor(n: Int, _ fn: ParallelForBody) { + parallelFor(n: n) { start, end, total in + for i in start.. Void) - - /// Run two tasks (optionally) in parallel. - /// - /// Fork-join parallelism allows for efficient work-stealing parallelism. The two non-escaping - /// functions will have finished executing before `pJoin` returns. The first function will execute on - /// the local thread immediately, and the second function will execute on another thread if resources - /// are available, or on the local thread if there are not available other resources. - func join(_ a: (Self) -> Void, _ b: (Self) -> Void) - - /// Run two throwing tasks (optionally) in parallel; if one task throws, it is unspecified - /// whether the second task is even started. - /// - /// This is the throwing overloaded variation. - func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws -} - -extension TypedComputeThreadPool { - /// Implement the non-throwing variation in terms of the throwing one. - public func join(_ a: (Self) -> Void, _ b: (Self) -> Void) { - withoutActuallyEscaping(a) { a in - let throwing: (Self) throws -> Void = a - // Implement the non-throwing in terms of the throwing implementation. - try! join(throwing, b) + /// Implements `parallelFor(n:_:)` (scalar) in terms of `parallelFor(n:_:)` (vectorized). + public func parallelFor(n: Int, _ fn: ThrowingParallelForBody) throws { + try parallelFor(n: n) { start, end, total in + for i in start.. Void) { - dispatch { _ in fn() } - } - - public func join(_ a: () -> Void, _ b: () -> Void) { - join({ _ in a() }, { _ in b() }) - } - - public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { - try join({ _ in try a() }, { _ in try b() }) - } -} - /// A `ComputeThreadPool` that executes everything immediately on the current thread. /// /// This threadpool implementation is useful for testing correctness, as well as avoiding context /// switches when a computation is designed to be parallelized at a coarser level. -public struct InlineComputeThreadPool: TypedComputeThreadPool { +public struct InlineComputeThreadPool: ComputeThreadPool { /// Initializes `self`. public init() {} - /// The amount of parallelism available in this thread pool. - public var parallelism: Int { 1 } + /// The maximum number of concurrent threads of execution supported by this thread pool. + public var maxParallelism: Int { 1 } /// The index of the current thread. public var currentThreadIndex: Int? { 0 } @@ -202,14 +168,32 @@ public struct InlineComputeThreadPool: TypedComputeThreadPool { /// Dispatch `fn` to be run at some point in the future (immediately). /// /// Note: this implementation just executes `fn` immediately. - public func dispatch(_ fn: (Self) -> Void) { - fn(self) + public func dispatch(_ fn: () -> Void) { + fn() + } + + /// Executes `a` and `b` optionally in parallel, and returns when both are complete. + /// + /// Note: this implementation simply executes them serially. + public func join(_ a: () -> Void, _ b: () -> Void) { + a() + b() + } + + /// Executes `a` and `b` optionally in parallel, and returns when both are complete. + /// + /// Note: this implementation simply executes them serially. + public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { + try a() + try b() + } + + public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) { + fn(0, n, n) } - /// Executes `a` and `b` and returns when both are complete. - public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws { - try a(self) - try b(self) + public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws { + try fn(0, n, n) } } @@ -267,8 +251,8 @@ extension ComputeThreadPools { } /// The amount of parallelism provided by the current thread-local compute pool. - public static var parallelism: Int { - local.parallelism + public static var maxParallelism: Int { + local.maxParallelism } /// Sets `pool` to `local` for the duration of `body`. diff --git a/Sources/PenguinParallelWithFoundation/PJoin.swift b/Sources/PenguinParallelWithFoundation/PJoin.swift deleted file mode 100644 index c6adcecc..00000000 --- a/Sources/PenguinParallelWithFoundation/PJoin.swift +++ /dev/null @@ -1,328 +0,0 @@ -// Copyright 2020 Penguin Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import Foundation - -#if os(macOS) || os(iOS) || os(tvOS) || os(watchOS) - import Darwin -#else - import Glibc -#endif - -/// A Naive ThreadPool. -/// -/// It has well-known performance problems, but is used as a reference implementation to: (1) test -/// correctness of alternate implementations, and (2) to allow higher levels of abstraction to be -/// developed (and tested) in parallel with an efficient implementation of `ThreadPool`. -final public class NaiveThreadPool: TypedComputeThreadPool { - init(workerCount: Int) { - workers.reserveCapacity(workerCount) - for i in 0.. Void) { - // TODO: Implement me! - fatalError("SORRY NOT YET IMPLEMENTED!") - } - - public func join(_ a: (NaiveThreadPool) throws -> Void, _ b: (NaiveThreadPool) throws -> Void) - throws - { - // TODO: Avoid extra closure construction! - try withoutActuallyEscaping({ try b(self) }) { b in - var item = WorkItem(op: b) - var aError: Error? = nil - contexts.addLocal(item: &item) - defer { - let tmp = contexts.popLocal() - assert(tmp == &item, "Popped something other than item!") - } - do { - try a(self) - } catch { - if item.tryTake() { - throw error - } else { - // Another thread is executing item; can't return! - aError = error - } - } - if item.tryTake() { - item.execute() - } - // In case it was stolen by a background thread, steal other work. - while !item.isFinished { - // Prefer local work over remote work. - if let work = contexts.lookForWorkLocal() { - work.pointee.execute() - continue - } - // Look for local work. - let ctxs = contexts.allContexts() - for ctx in ctxs { - if let work = ctx.lookForWork() { - work.pointee.execute() - break - } - } - } - if let error = aError { - throw error - } - if let error = item.error { - throw error - } - } - } - - private func allContexts() -> AllContexts { - return contexts - } - - private var contexts = AllContexts() - private var workers = [Worker]() - - private final class Worker: Thread { - - init(name: String, index: Int, allContexts: AllContexts) { - self.allContexts = allContexts - self.index = index - super.init() - self.name = name - } - - override final func main() { - // Touch the thread-local context to create it & add it to the AllContext's list. - _ = Context.local(index: index, allContexts: allContexts) - var foundWork = false // If we're finding work, don't wait on the condition variable. - - // Loop, looking for work. - while true { - let ctxs: [Context] - if !foundWork { - ctxs = allContexts.wait() - } else { - ctxs = allContexts.allContexts() - } - - foundWork = false - for ctx in ctxs { - if let item = ctx.lookForWork() { - item.pointee.execute() - foundWork = true - } - } - } - // TODO: have a good way for a clean shutdown. - } - - let allContexts: AllContexts - let index: Int - } - - // Note: this cheap-and-dirty implementation is nowhere close to optimal! - private final class AllContexts { - func append(_ context: Context) { - cond.lock() - defer { cond.unlock() } - cond.signal() - contexts.append(context) - } - - func addLocal(item: UnsafeMutablePointer) { - Context.local(index: -1, allContexts: self).add(item) - notify() - } - - func lookForWorkLocal() -> UnsafeMutablePointer? { - Context.local(index: -1, allContexts: self).lookForWorkLocal() - } - - func popLocal() -> UnsafeMutablePointer? { - Context.local(index: -1, allContexts: self).popLast() - } - - func allContexts() -> [Context] { - cond.lock() - defer { cond.unlock() } - return contexts - } - - func wait() -> [Context] { - cond.lock() - defer { cond.unlock() } - cond.wait() - return contexts - } - - func notify() { - cond.signal() - } - - func broadcast() { - cond.broadcast() - } - - // TODO: Keep track of contexts that might have useful things in order to avoid - // doing unnecessary work when looking for work if this turns out to be a - // performance problem. - private var contexts = [Context]() - private let cond = NSCondition() - } - - /// Thread-local contexts - private final class Context { - private var lock = NSLock() - private var workItems = [UnsafeMutablePointer]() - let index: Int - - init(index: Int) { - self.index = index - } - - // Adds a workitem to the list. - // - // Note: the caller must notify potential AllContext's waiters. (This should not be directly called, and - // only called within `AllContexts.addLocal`.) - func add(_ item: UnsafeMutablePointer) { - lock.lock() - defer { lock.unlock() } - workItems.append(item) - } - - // This should also only be called when - func popLast() -> UnsafeMutablePointer? { - lock.lock() - defer { lock.unlock() } - return workItems.popLast() - } - - func lookForWork() -> UnsafeMutablePointer? { - lock.lock() - defer { lock.unlock() } - for elem in workItems { - if elem.pointee.tryTake() { - return elem - } - } - return nil - } - - func lookForWorkLocal() -> UnsafeMutablePointer? { - lock.lock() - defer { lock.unlock() } - for elem in workItems.reversed() { - if elem.pointee.tryTake() { - return elem - } - } - return nil - } - - /// The data key for the singleton `Context` in the current thread. - /// - /// TODO: figure out what to do vis-a-vis multiple thread pools? Maybe re-structure to avoid using - /// threadlocal variables, and instead create a map keyed by thread id? - static let key: pthread_key_t = { - var key = pthread_key_t() - pthread_key_create(&key) { obj in - #if !(os(macOS) || os(iOS) || os(watchOS) || os(tvOS)) - let obj = obj! - #endif - Unmanaged.fromOpaque(obj).release() - } - return key - }() - - /// The thread-local singleton. - static func local(index: Int, allContexts: AllContexts) -> Context { - if let address = pthread_getspecific(key) { - return Unmanaged.fromOpaque(address).takeUnretainedValue() - } - let context = Context(index: index) - allContexts.append(context) - pthread_setspecific(key, Unmanaged.passRetained(context).toOpaque()) - return context - } - } - - public static let global = NaiveThreadPool() -} - -private struct WorkItem { - enum State { - case pre - case ongoing - case finished - } - var op: () throws -> Void // guarded by lock. - var error: Error? // guarded by lock. - var state: State = .pre - let lock = NSLock() // TODO: use atomic operations on State. (No need for a lock.) - - /// Returns true if this thread should execute op, false otherwise. - mutating func tryTake() -> Bool { - lock.lock() - defer { lock.unlock() } - if state == .pre { - state = .ongoing - return true - } - return false - } - - mutating func execute() { - do { - try op() - } catch { - self.error = error - } - markFinished() - } - - mutating func markFinished() { - lock.lock() - defer { lock.unlock() } - assert(state == .ongoing) - state = .finished - } - - var isFinished: Bool { - lock.lock() - defer { lock.unlock() } - return state == .finished - } -} diff --git a/Tests/PenguinGraphTests/VertexParallelTests.swift b/Tests/PenguinGraphTests/VertexParallelTests.swift index e5972859..7c28d0df 100644 --- a/Tests/PenguinGraphTests/VertexParallelTests.swift +++ b/Tests/PenguinGraphTests/VertexParallelTests.swift @@ -219,18 +219,18 @@ final class VertexParallelTests: XCTestCase { } func testPerThreadMailboxesShortestPathsUserThread() { - let testPool = TestSequentialThreadPool(parallelism: 10) + let testPool = TestSequentialThreadPool(maxParallelism: 10) runParallelMailboxesTest(testPool) } func testPerThreadMailboxesShortestPathsPoolThread() { - var testPool = TestSequentialThreadPool(parallelism: 10) + var testPool = TestSequentialThreadPool(maxParallelism: 10) testPool.currentThreadIndex = 3 runParallelMailboxesTest(testPool) } func testPerThreadMailboxesWonkyMessagePatterns() { - var testPool = TestSequentialThreadPool(parallelism: 10) + var testPool = TestSequentialThreadPool(maxParallelism: 10) let g = makeDistanceGraph() let vIds = g.vertices ComputeThreadPools.withPool(testPool) { @@ -271,8 +271,10 @@ final class VertexParallelTests: XCTestCase { } func testPerThreadMailboxesMultiThreaded() { - ComputeThreadPools.withPool(NaiveThreadPool.global) { - XCTAssert(ComputeThreadPools.parallelism > 1) + // TODO: Don't create a new thread pool in the test. + let pool = PosixNonBlockingThreadPool(name: "per-thread-mailboxes-multi-threaded") + ComputeThreadPools.withPool(pool) { + XCTAssert(ComputeThreadPools.maxParallelism > 1) var g = makeDistanceGraph() let vIds = g.vertices var mailboxes = PerThreadMailboxes( @@ -308,7 +310,7 @@ final class VertexParallelTests: XCTestCase { } func testPerThreadMailboxesDelivery() { - var testPool = TestSequentialThreadPool(parallelism: 10, currentThreadIndex: 3) + var testPool = TestSequentialThreadPool(maxParallelism: 10, currentThreadIndex: 3) let mailboxes = PerThreadMailboxes(vertexCount: 5, threadCount: 10) ComputeThreadPools.withPool(testPool) { @@ -501,19 +503,32 @@ fileprivate struct TestMessage: Equatable, MergeableMessage { /// A test thread pool that doesn't have parallelism, but makes it easy to pretend as if multiple /// threads are sequentially performing operations. -fileprivate struct TestSequentialThreadPool: TypedComputeThreadPool { +fileprivate struct TestSequentialThreadPool: ComputeThreadPool { /// The amount of parallelism to simulate in this thread pool. - public let parallelism: Int + public let maxParallelism: Int /// Set this to define the thread this simulation should be running on. public var currentThreadIndex: Int? = nil - public func dispatch(_ fn: (Self) -> Void) { - fn(self) + public func dispatch(_ fn: @escaping () -> Void) { + fn() } - public func join(_ a: (Self) throws -> Void, _ b: (Self) throws -> Void) throws { - try a(self) - try b(self) + public func join(_ a: () throws -> Void, _ b: () throws -> Void) throws { + try a() + try b() + } + + public func join(_ a: () -> Void, _ b: () -> Void) { + a() + b() + } + + public func parallelFor(n: Int, _ fn: VectorizedParallelForBody) { + fn(0, n, n) + } + + public func parallelFor(n: Int, _ fn: ThrowingVectorizedParallelForBody) throws { + try fn(0, n, n) } } diff --git a/Tests/PenguinParallelTests/NaiveThreadPoolTests.swift b/Tests/PenguinParallelTests/NaiveThreadPoolTests.swift deleted file mode 100644 index fd796ade..00000000 --- a/Tests/PenguinParallelTests/NaiveThreadPoolTests.swift +++ /dev/null @@ -1,102 +0,0 @@ -// Copyright 2020 Penguin Authors -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -import Foundation -import PenguinParallelWithFoundation -import XCTest - -final class NaiveThreadPoolTests: XCTestCase { - - func testThrowingJoin() throws { - do { - try NaiveThreadPool.global.join({ _ in }, { _ in throw TestError() }) - XCTFail("Should have thrown!") - } catch is TestError {} // Pass - - do { - try NaiveThreadPool.global.join({ _ in throw TestError() }, { _ in }) - XCTFail("Should have thrown!") - } catch is TestError {} // Pass - } - - func testThrowingParallelFor() throws { - do { - try NaiveThreadPool.global.parallelFor(n: 100) { (i, _) in - if i == 57 { - throw TestError() - } - } - XCTFail("Should have thrown!") - } catch is TestError { - // Pass! - return - } - } - - func testThreadIndex() { - let threadCount = NaiveThreadPool.global.parallelism - // Run `threadCount` tasks, and block on `condition` until all of them have consumed the - // thread pool resources. Check the thread index on each one to ensure we've seen them all. - // The main thread waits on `doneCondition` which is notified when `threadsSeenCount` - // returns to 0. - let condition = NSCondition() - - // Because the thread pool is intentionally racy (for performance reasons), and assumes work - // does not block the thread pool, we intentionally do not block indefinitely upon - // `condition`, and instead try a few times to ensure we appropriately satisfy the test - // conditions. - for attempt in 0..<10 { - var threadsSeenCount = 0 // guarded by condition - var seenMainThread = false // guarded by condition - var seenThreadIds = Set() // guarded by condition - var successfulRun = true // guarded by condition - - NaiveThreadPool.global.parallelFor(n: threadCount + 1) { (i, _) in - condition.lock() - if let threadId = NaiveThreadPool.global.currentThreadIndex { - threadsSeenCount += 1 - seenThreadIds.insert(threadId) - } else { - // Either we've not seen the main thread, or this must be an unsuccessful run. - XCTAssert( - !seenMainThread || !successfulRun, - "\(attempt): Main thread: \(seenMainThread), successful: \(successfulRun)") - seenMainThread = true - } - if threadsSeenCount == threadCount { - condition.broadcast() // Wake up all waiters. - } else { - if !condition.wait(until: Date() + 0.5) { // Wait no more than 500 ms. - successfulRun = false - } - } - condition.unlock() - } - // Finished. - if !successfulRun { continue } // try again. - XCTAssert(seenMainThread) - XCTAssertEqual(Set(0..