Skip to content

Commit

Permalink
Add ConcurrentPriorityQueue
Browse files Browse the repository at this point in the history
  • Loading branch information
Petr Prokop committed Mar 6, 2021
1 parent f31172c commit a37e739
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 9 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ if index < concurrentArray.count {
```

## Priority queue
**SCC** provides classical (non-concurrent) priority queue
**SCC** provides both classical and concurrent priority queues

```swift
var priorityQueue = PriorityQueue<Int>(<)
Expand All @@ -64,3 +64,4 @@ while priorityQueue.count > 0 {

As you can see `PriorityQueue<Int>(<)` constructs min-queue, with `PriorityQueue<Int>(>)` you can get max-queue.
If you need to reserve capacity right away, use `PriorityQueue<Int>(capacity: 1024, comparator: <)`.
`ConcurrentPriorityQueue<Int>(<)` creates a thread-safe version, with a very similar interface.
8 changes: 8 additions & 0 deletions SwiftConcurrentCollections.xcodeproj/project.pbxproj
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
2F897F0823E8692800B522AF /* ConcurrentDictionary.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2F897F0723E8692800B522AF /* ConcurrentDictionary.swift */; };
2F897F0C23E86A8900B522AF /* ConcurrentArray.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2F897F0B23E86A8900B522AF /* ConcurrentArray.swift */; };
2F897F0E23E8757400B522AF /* ConcurrentArrayTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = 2F897F0D23E8757400B522AF /* ConcurrentArrayTests.swift */; };
D2ED2DF125F375C100BA4B77 /* ConcurrentPriorityQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2ED2DF025F375C100BA4B77 /* ConcurrentPriorityQueue.swift */; };
D2ED2DF525F3789400BA4B77 /* ConcurrentPriorityQueueTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2ED2DF425F3789400BA4B77 /* ConcurrentPriorityQueueTests.swift */; };
D2FD7F7925713C750014F8E8 /* PriorityQueue.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2FD7F7825713C750014F8E8 /* PriorityQueue.swift */; };
D2FD7F7D257147470014F8E8 /* PriorityQueueTests.swift in Sources */ = {isa = PBXBuildFile; fileRef = D2FD7F7C257147470014F8E8 /* PriorityQueueTests.swift */; };
/* End PBXBuildFile section */
Expand Down Expand Up @@ -44,6 +46,8 @@
2F897F0B23E86A8900B522AF /* ConcurrentArray.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConcurrentArray.swift; sourceTree = "<group>"; };
2F897F0D23E8757400B522AF /* ConcurrentArrayTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConcurrentArrayTests.swift; sourceTree = "<group>"; };
2F897F0F23E87A1800B522AF /* README.md */ = {isa = PBXFileReference; lastKnownFileType = net.daringfireball.markdown; path = README.md; sourceTree = "<group>"; };
D2ED2DF025F375C100BA4B77 /* ConcurrentPriorityQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ConcurrentPriorityQueue.swift; sourceTree = "<group>"; };
D2ED2DF425F3789400BA4B77 /* ConcurrentPriorityQueueTests.swift */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.swift; path = ConcurrentPriorityQueueTests.swift; sourceTree = "<group>"; };
D2FD7F7825713C750014F8E8 /* PriorityQueue.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PriorityQueue.swift; sourceTree = "<group>"; };
D2FD7F7C257147470014F8E8 /* PriorityQueueTests.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = PriorityQueueTests.swift; sourceTree = "<group>"; };
/* End PBXFileReference section */
Expand Down Expand Up @@ -97,6 +101,7 @@
2F897EF123E8683000B522AF /* Info.plist */,
2F7CD5F623F051C00060F6BD /* RWLock.swift */,
D2FD7F7825713C750014F8E8 /* PriorityQueue.swift */,
D2ED2DF025F375C100BA4B77 /* ConcurrentPriorityQueue.swift */,
);
path = SwiftConcurrentCollections;
sourceTree = "<group>";
Expand All @@ -106,6 +111,7 @@
children = (
2F897EFB23E8683000B522AF /* ConcurrentDictionaryTests.swift */,
2F897F0D23E8757400B522AF /* ConcurrentArrayTests.swift */,
D2ED2DF425F3789400BA4B77 /* ConcurrentPriorityQueueTests.swift */,
D2FD7F7C257147470014F8E8 /* PriorityQueueTests.swift */,
2F897EFD23E8683000B522AF /* Info.plist */,
);
Expand Down Expand Up @@ -227,6 +233,7 @@
2F897F0823E8692800B522AF /* ConcurrentDictionary.swift in Sources */,
2F7CD5F923F055610060F6BD /* GCDConcurrentDictionary.swift in Sources */,
D2FD7F7925713C750014F8E8 /* PriorityQueue.swift in Sources */,
D2ED2DF125F375C100BA4B77 /* ConcurrentPriorityQueue.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
};
Expand All @@ -236,6 +243,7 @@
files = (
2F897F0E23E8757400B522AF /* ConcurrentArrayTests.swift in Sources */,
2F897EFC23E8683000B522AF /* ConcurrentDictionaryTests.swift in Sources */,
D2ED2DF525F3789400BA4B77 /* ConcurrentPriorityQueueTests.swift in Sources */,
D2FD7F7D257147470014F8E8 /* PriorityQueueTests.swift in Sources */,
);
runOnlyForDeploymentPostprocessing = 0;
Expand Down
82 changes: 82 additions & 0 deletions SwiftConcurrentCollections/ConcurrentPriorityQueue.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import Foundation

/// Thread-safe priority queue wrapper
/// - Important: Note that this is a `class`, i.e. reference (not value) type
public final class ConcurrentPriorityQueue<Element> {

private var container: PriorityQueue<Element>
private let rwlock = RWLock()

public var count: Int {
rwlock.readLock()
defer {
rwlock.unlock()
}
return container.count
}

public init(capacity: Int, complexComparator: @escaping PriorityQueue<Element>.ComplexComparator) {
self.container = PriorityQueue(capacity: capacity, complexComparator: complexComparator)
}

public init(capacity: Int, comparator: @escaping PriorityQueue<Element>.Comparator) {
self.container = PriorityQueue(capacity: capacity, comparator: comparator)
}

/// Creates `ConcurrentPriorityQueue` with default capacity
/// - Parameter comparator: heap property will hold if `comparator(parent(i), i)` is `true`
/// e.g. `<` will create a minimum-queue and `>` - maximum-queue
public init(_ comparator: @escaping PriorityQueue<Element>.Comparator) {
self.container = PriorityQueue(comparator)
}

public func insert(_ value: Element) {
rwlock.writeLock()
container.insert(value)
rwlock.unlock()
}

/// Remove top element from the queue
public func pop() -> Element {
rwlock.writeLock()
defer {
rwlock.unlock()
}

return container.pop()
}

/// Get top element from the queue
public func peek() -> Element {
rwlock.readLock()
defer {
rwlock.unlock()
}

return container.peek()
}

/// Get top element from the queue, returns `nil`if queue is empty
public func safePeek() -> Element? {
rwlock.readLock()
defer {
rwlock.unlock()
}
return container.count > 0
? container.peek()
: nil
}

/// Remove top element from the queue, returns `nil`if queue is empty
public func safePop() -> Element? {
rwlock.writeLock()
defer {
rwlock.unlock()
}

return container.count > 0
? container.pop()
: nil
}

}
16 changes: 8 additions & 8 deletions SwiftConcurrentCollections/PriorityQueue.swift
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ import Foundation

private let defaultCapacity = 16

public struct PriorityQueue<V> {
public typealias ComplexComparator = (V, V) -> ComparisonResult
public typealias Comparator = (V, V) -> Bool
public struct PriorityQueue<Element> {
public typealias ComplexComparator = (Element, Element) -> ComparisonResult
public typealias Comparator = (Element, Element) -> Bool

private var container: [V]
private var container: [Element]
private var heapSize = 0
private let comparator: Comparator

Expand All @@ -34,7 +34,7 @@ public struct PriorityQueue<V> {
public init(capacity: Int, comparator: @escaping Comparator) {
self.comparator = comparator

container = Array<V>()
container = Array<Element>()
container.reserveCapacity(capacity)
}

Expand All @@ -45,7 +45,7 @@ public struct PriorityQueue<V> {
self.init(capacity: defaultCapacity, comparator: comparator)
}

mutating public func insert(_ value: V) {
mutating public func insert(_ value: Element) {
if heapSize == container.capacity {
let reserveSize = container.count > 0 ? container.count : defaultCapacity
container.reserveCapacity(container.count + reserveSize)
Expand All @@ -64,7 +64,7 @@ public struct PriorityQueue<V> {
}

/// Remove top element from the queue
mutating public func pop() -> V {
mutating public func pop() -> Element {
if heapSize <= 0 {
fatalError("Fatal error: trying to pop from an empty PriorityQueue")
}
Expand All @@ -83,7 +83,7 @@ public struct PriorityQueue<V> {
}

/// Get top element from the queue
public func peek() -> V {
public func peek() -> Element {
if heapSize <= 0 {
fatalError("Fatal error: trying to peek into an empty PriorityQueue")
}
Expand Down
78 changes: 78 additions & 0 deletions SwiftConcurrentCollectionsTests/ConcurrentPriorityQueueTests.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import XCTest
@testable import SwiftConcurrentCollections

class ConcurrentPriorityQueueTests: XCTestCase {

func testConcurrentReadingAndWriting() {
let startDate = Date().timeIntervalSince1970
let concurrentPriorityQueue = ConcurrentPriorityQueue<Int>(<)
// Unsafe version:
// var concurrentPriorityQueue = PriorityQueue<Int>(<)

let readingQueue = DispatchQueue(
label: "ConcurrentArrayTests.readingQueue",
qos: .userInteractive,
attributes: .concurrent
)

let writingQueue = DispatchQueue(
label: "ConcurrentArrayTests.writingQueue",
qos: .userInteractive,
attributes: .concurrent
)

// 100 seems to be small enough not to cause performance problems
for i in 0 ..< 100 {
let writeExpectation = expectation(description: "Write expectation")
let readExpectation = expectation(description: "Read expectation")

writingQueue.async {
concurrentPriorityQueue.insert(i)
writeExpectation.fulfill()
}

readingQueue.async {
let count = concurrentPriorityQueue.count
guard count > 0 else {
return
}

let value = concurrentPriorityQueue.peek()
print(value)
readExpectation.fulfill()
}
}

waitForExpectations(timeout: 10, handler: nil)
print("\(#function) took \(Date().timeIntervalSince1970 - startDate) seconds")
}

func testPop() {
let concurrentPriorityQueue = ConcurrentPriorityQueue<Int>(<)

concurrentPriorityQueue.insert(1)
XCTAssertEqual(concurrentPriorityQueue.count, 1)
XCTAssertEqual(concurrentPriorityQueue.peek(), 1)
XCTAssertEqual(concurrentPriorityQueue.pop(), 1)
XCTAssertEqual(concurrentPriorityQueue.count, 0)
}

func testSafePop() {
let concurrentPriorityQueue = ConcurrentPriorityQueue<Int>(<)

XCTAssertEqual(concurrentPriorityQueue.safePop(), nil)
concurrentPriorityQueue.insert(1)
XCTAssertEqual(concurrentPriorityQueue.safePop(), 1)
XCTAssertEqual(concurrentPriorityQueue.count, 0)
}

func testSafePeek() {
let concurrentPriorityQueue = ConcurrentPriorityQueue<Int>(<)

XCTAssertEqual(concurrentPriorityQueue.safePeek(), nil)
concurrentPriorityQueue.insert(1)
XCTAssertEqual(concurrentPriorityQueue.safePeek(), 1)
XCTAssertEqual(concurrentPriorityQueue.count, 1)
}

}

0 comments on commit a37e739

Please sign in to comment.