-
Notifications
You must be signed in to change notification settings - Fork 0
/
ThreadSafeQueue.h
190 lines (168 loc) · 5.57 KB
/
ThreadSafeQueue.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#ifndef THREADSAFEQUEUE_H
#define THREADSAFEQUEUE_H
#include <mutex>
#include <queue>
#include <thread>
/**
* ThreadSafeQueue is a warpper around the std::queue class. This wrapper adds a mutex to make it thread safe
*
* This class is functionally the same as std::queue, except the pop() function both removes the first element of the queue, and returns that element
*
* @tparam C the type of data the queue stores
*/
template <class C>
class ThreadSafeQueue {
public:
/**
* Default constructor.
*
* Initializes sleepLockMilliseconds, the number of miliseconds a thread should sleep after encountering a conflict
*
* @tparam C the type of data the queue stores.
*/
ThreadSafeQueue();
/**
* Constructor.
*
* Initializes sleepLockMilliseconds, the number of miliseconds a thread should sleep after encountering a conflict.
* If the parameter passed is equal or less 0, sleepLockMiliseconds defaults to 1.
*
* @tparam C the type of data the queue stores.
* @param sLockMiliseconds the number of miliseconds a thread should sleep after encountering a conflict
*/
ThreadSafeQueue(int sLockMilliseconds);
/**
* Thread safe wrapper around queue.pop().
*
* Unlike the STD counterpart, pop() both removes the first element and returns it.
*
* @tparam C the type of data the queue stores.
* @return C the item at the front of the queue.
*/
C safePop();
/**
* Thread safe wrapper around queue.pop().
*
* @tparam C the type of data the queue stores.
* @param[out] output the object popped from the queue.
* @return true if successfully popped, false otherwise.
*/
bool safePop(C* output);
/**
* Thread safe wrapper around queue.push().
*
* @tparam C the type of data the queue stores.
* @param data the object to be pushed to the queue.
*/
void push(C data);
/**
* Thread safe wrapper around queue.empty().
*
* @tparam C the type of data the queue stores.
* @return true if the queue is empty, false otherwise.
*/
bool empty();
/**
* Thread safe wrapper around queue.size().
*
* @tparam C the type of data the queue stores.
* @return the size of the queue.
*/
int size();
/**
* Thread safe wrapper around queue's copy constructor.
*
* @tparam C the type of data the queue stores.
* @return a reference to the local variable.
*/
ThreadSafeQueue<C>& operator=(const ThreadSafeQueue<C>& copy);
private:
std::queue<C> queue;
std::mutex mu;
std::chrono::milliseconds sleepLockMilliseconds;
};
template<class C>
inline ThreadSafeQueue<C>::ThreadSafeQueue() {
sleepLockMilliseconds = std::chrono::milliseconds(1);
}
template<class C>
inline ThreadSafeQueue<C>::ThreadSafeQueue(int sLockMilliseconds) {
if(sLockMilliseconds > 0)
sleepLockMilliseconds = std::chrono::milliseconds(sLockMilliseconds);
else
sleepLockMilliseconds = std::chrono::milliseconds(1);
}
template <class C>
inline C ThreadSafeQueue<C>::safePop() {
std::unique_lock<std::mutex> lock(mu, std::try_to_lock);
while(!lock.owns_lock()) {
std::this_thread::sleep_for(sleepLockMilliseconds);
lock.try_lock();
}
C temp;
if(!queue.empty()) {
temp = queue.front();
queue.pop();
}
return temp;
}
template <class C>
inline bool ThreadSafeQueue<C>::safePop(C* output) {
std::unique_lock<std::mutex> lock(mu, std::try_to_lock);
while(!lock.owns_lock()) {
std::this_thread::sleep_for(sleepLockMilliseconds);
lock.try_lock();
}
if(!queue.empty()) {
*output = queue.front();
queue.pop();
return true;
}
return false;
}
template <class C>
inline void ThreadSafeQueue<C>::push(C data) {
std::unique_lock<std::mutex> lock(mu, std::try_to_lock);
while(!lock.owns_lock()) {
std::this_thread::sleep_for(sleepLockMilliseconds);
lock.try_lock();
}
queue.push(data);
}
template <class C>
inline bool ThreadSafeQueue<C>::empty() {
std::unique_lock<std::mutex> lock(mu, std::try_to_lock);
while(!lock.owns_lock()) {
std::this_thread::sleep_for(sleepLockMilliseconds);
lock.try_lock();
}
bool temp = queue.empty();
return temp;
}
template <class C>
inline int ThreadSafeQueue<C>::size() {
std::unique_lock<std::mutex> lock(mu, std::try_to_lock);
while(!lock.owns_lock()) {
std::this_thread::sleep_for(sleepLockMilliseconds);
lock.try_lock();
}
int temp = queue.size();
return temp;
}
template<class C>
inline ThreadSafeQueue<C>& ThreadSafeQueue<C>::operator=(const ThreadSafeQueue<C>& copy) {
std::unique_lock<std::mutex> lock(mu, std::try_to_lock);
while(!lock.owns_lock()) {
std::this_thread::sleep_for(sleepLockMilliseconds);
lock.try_lock();
}
std::queue<C> empty;
std::swap(queue, empty);
std::queue<C> temp = copy.queue;
while(temp.front) {
queue.insert(temp.front());
temp.pop();
}
return *this;
}
#endif