Skip to content

Commit 5dbbb7c

Browse files
hisundarchiyoung
authored andcommitted
MB-19223: Switch to hrtime from timeval in Global Thread Pool
This has small improvements in memory and cpu usage. Also fixes several ThreadSanitizer races from unit tests - for example: WARNING: ThreadSanitizer: data race (pid=21672) Write of size 8 at 0x7d140000e7b8 by main thread (mutexes: write M14972, write M14985): #0 memcpy <null> (engine_testapp+0x000000453040) #1 TaskQueue::_wake(SingleThreadedRCPtr<GlobalTask>&) /home/daver/repos/couchbase/server/ep-engine/src/taskqueue.cc:255 (ep.so+0x0000002577f6) #2 TaskQueue::wake(SingleThreadedRCPtr<GlobalTask>&) /home/daver/repos/couchbase/server/ep-engine/src/taskqueue.cc:282 (ep.so+0x000000257c73) #3 ExecutorPool::_wake(unsigned long) /home/daver/repos/couchbase/server/ep-engine/src/executorpool.cc:320 (ep.so+0x0000001acc76) #4 ExecutorPool::wake(unsigned long) /home/daver/repos/couchbase/server/ep-engine/src/executorpool.cc:328 (ep.so+0x0000001ace13) #5 Flusher::wait() /home/daver/repos/couchbase/server/ep-engine/src/flusher.cc:41 (ep.so+0x0000001cc4ff) #6 EventuallyPersistentStore::stopFlusher() /home/daver/repos/couchbase/server/ep-engine/src/ep.cc:402 (ep.so+0x0000000d54d5) #7 ~EventuallyPersistentStore /home/daver/repos/couchbase/server/ep-engine/src/ep.cc:364 (ep.so+0x0000000d49cb) #8 ~EventuallyPersistentEngine /home/daver/repos/couchbase/server/ep-engine/src/ep_engine.cc:5778 (ep.so+0x000000161043) #9 EvpDestroy(engine_interface*, bool) /home/daver/repos/couchbase/server/ep-engine/src/ep_engine.cc:143 (ep.so+0x000000135efa) #10 mock_destroy /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:61 (engine_testapp+0x0000004bb9d6) #11 destroy_engine /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:998 (engine_testapp+0x0000004bb646) #12 execute_test /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:1048 (engine_testapp+0x0000004baa11) #13 main /home/daver/repos/couchbase/server/memcached/programs/engine_testapp/engine_testapp.c:1296 (engine_testapp+0x0000004b8861) Previous read of size 8 at 0x7d140000e7b8 by thread T14: #0 ExecutorThread::run() /home/daver/repos/couchbase/server/ep-engine/src/executorthread.cc:106 (ep.so+0x0000001e3488) #1 launch_executor_thread(void*) /home/daver/repos/couchbase/server/ep-engine/src/executorthread.cc:34 (ep.so+0x0000001e2a5a) #2 platform_thread_wrap /home/daver/repos/couchbase/server/platform/src/cb_pthreads.c:19 (libplatform.so.0.1.0+0x0000000035dc) Change-Id: I78fdddb832251fc062058c04f75f8d22c4c2f68d Reviewed-on: http://review.couchbase.org/62912 Well-Formed: buildbot <[email protected]> Reviewed-by: Will Gardner <[email protected]> Tested-by: buildbot <[email protected]> Reviewed-by: Chiyoung Seo <[email protected]>
1 parent 8cbe913 commit 5dbbb7c

File tree

10 files changed

+69
-102
lines changed

10 files changed

+69
-102
lines changed

src/access_scanner.cc

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,10 +157,18 @@ bool AccessScanner::run() {
157157
}
158158
}
159159
snooze(sleepTime);
160-
stats.alogTime.store(waketime.tv_sec);
160+
updateAlogTime(sleepTime);
161+
161162
return true;
162163
}
163164

165+
void AccessScanner::updateAlogTime(double sleepSecs) {
166+
struct timeval _waketime;
167+
gettimeofday(&_waketime, NULL);
168+
_waketime.tv_sec += sleepSecs;
169+
stats.alogTime.store(_waketime.tv_sec);
170+
}
171+
164172
std::string AccessScanner::getDescription() {
165173
return std::string("Generating access log");
166174
}

src/access_scanner.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,8 @@ class AccessScanner : public GlobalTask {
4747
AtomicValue<size_t> completedCount;
4848

4949
private:
50+
void updateAlogTime(double sleepSecs);
51+
5052
EventuallyPersistentStore &store;
5153
EPStats &stats;
5254
double sleepTime;

src/executorpool.cc

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -528,10 +528,7 @@ bool ExecutorPool::_stopTaskGroup(EventuallyPersistentEngine *e,
528528
}
529529
}
530530
if (unfinishedTask) {
531-
struct timeval waktime;
532-
gettimeofday(&waktime, NULL);
533-
advance_tv(waktime, MIN_SLEEP_TIME);
534-
tMutex.wait(waktime); // Wait till task gets cancelled
531+
tMutex.wait(MIN_SLEEP_TIME); // Wait till task gets cancelled
535532
}
536533
} while (unfinishedTask);
537534

@@ -692,13 +689,9 @@ static void addWorkerStats(const char *prefix, ExecutorThread *t,
692689
(gethrtime() - t->getTaskStart()) / 1000, add_stat, cookie);
693690
}
694691
snprintf(statname, sizeof(statname), "%s:waketime", prefix);
695-
uint64_t abstime = t->getWaketime().tv_sec*1000000 +
696-
t->getWaketime().tv_usec;
697-
add_casted_stat(statname, abstime, add_stat, cookie);
692+
add_casted_stat(statname, t->getWaketime(), add_stat, cookie);
698693
snprintf(statname, sizeof(statname), "%s:cur_time", prefix);
699-
abstime = t->getCurTime().tv_sec*1000000 +
700-
t->getCurTime().tv_usec;
701-
add_casted_stat(statname, abstime, add_stat, cookie);
694+
add_casted_stat(statname, t->getCurTime(), add_stat, cookie);
702695
}
703696

704697
void ExecutorPool::doWorkerStat(EventuallyPersistentEngine *engine,

src/executorthread.cc

Lines changed: 17 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -87,23 +87,19 @@ void ExecutorThread::run() {
8787

8888
// Measure scheduling overhead as difference between the time
8989
// that the task wanted to wake up and the current time
90-
gettimeofday(&now, NULL);
91-
struct timeval woketime = currentTask->waketime;
92-
uint64_t diffsec = now.tv_sec > woketime.tv_sec ?
93-
now.tv_sec - woketime.tv_sec : 0;
94-
uint64_t diffusec = now.tv_usec > woketime.tv_usec ?
95-
now.tv_usec - woketime.tv_usec : 0;
96-
90+
now = gethrtime();
91+
hrtime_t woketime = currentTask->waketime;
9792
engine->getEpStore()->logQTime(currentTask->getTypeId(),
98-
diffsec*1000000 + diffusec);
93+
now > woketime ? now - woketime
94+
: 0);
9995

100-
taskStart = gethrtime();
96+
taskStart = now;
10197
rel_time_t startReltime = ep_current_time();
10298
try {
10399
LOG(EXTENSION_LOG_DEBUG,
104-
"%s: Run task \"%s\" id %d waketime %d",
100+
"%s: Run task \"%s\" id %d",
105101
getName().c_str(), currentTask->getDescription().c_str(),
106-
currentTask->getId(), currentTask->waketime.tv_sec);
102+
currentTask->getId());
107103

108104
// Now Run the Task ....
109105
currentTask->setState(TASK_RUNNING, TASK_SNOOZED);
@@ -125,26 +121,26 @@ void ExecutorThread::run() {
125121
manager->doneWork(curTaskType);
126122
manager->cancel(currentTask->taskId, true);
127123
} else {
128-
struct timeval timetowake;
124+
hrtime_t new_waketime;
129125
// if a task has not set snooze, update its waketime to now
130126
// before rescheduling for more accurate timing histograms
131-
if (less_eq_tv(currentTask->waketime, now)) {
127+
if (currentTask->waketime <= now) {
132128
currentTask->waketime = now;
133129
}
134130
// release capacity back to TaskQueue ..
135131
manager->doneWork(curTaskType);
136-
timetowake = q->reschedule(currentTask, curTaskType);
132+
new_waketime = q->reschedule(currentTask, curTaskType);
137133
// record min waketime ...
138-
if (less_tv(timetowake, waketime)) {
139-
waketime = timetowake;
134+
if (new_waketime < waketime) {
135+
waketime = new_waketime;
140136
}
141-
LOG(EXTENSION_LOG_DEBUG,
142-
"%s: Reschedule a task \"%s\" id %d[%d %d |%d]",
137+
LOG(EXTENSION_LOG_DEBUG, "%s: Reschedule a task"
138+
" \"%s\" id %d[%llu %llu |%llu]",
143139
name.c_str(),
144140
currentTask->getDescription().c_str(),
145-
currentTask->getId(), timetowake.tv_sec,
146-
currentTask->waketime.tv_sec,
147-
waketime.tv_sec);
141+
currentTask->getId(), new_waketime,
142+
currentTask->waketime,
143+
waketime);
148144
}
149145
} catch (std::exception& e) {
150146
LOG(EXTENSION_LOG_WARNING,

src/executorthread.h

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ class ExecutorThread {
6464
state(EXECUTOR_CREATING), taskStart(0),
6565
currentTask(NULL), curTaskType(NO_TASK_TYPE),
6666
tasklog(TASK_LOG_SIZE), slowjobs(TASK_LOG_SIZE) {
67-
set_max_tv(waketime);
67+
waketime = hrtime_t(-1);
6868
}
6969

7070
~ExecutorThread() {
@@ -113,9 +113,9 @@ class ExecutorThread {
113113
return slowjobs.contents();
114114
}
115115

116-
struct timeval getWaketime(void) { return waketime; }
116+
const hrtime_t getWaketime(void) { return waketime; }
117117

118-
struct timeval getCurTime(void) { return now; }
118+
const hrtime_t getCurTime(void) { return now; }
119119

120120
private:
121121

@@ -125,8 +125,8 @@ class ExecutorThread {
125125
const std::string name;
126126
AtomicValue<executor_state_t> state;
127127

128-
struct timeval now; // record of current time
129-
struct timeval waketime; // set to the earliest
128+
hrtime_t now; // record of current time
129+
hrtime_t waketime; // set to the earliest
130130

131131
hrtime_t taskStart;
132132
ExTask currentTask;

src/syncobject.h

Lines changed: 4 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -40,43 +40,13 @@ class SyncObject : public Mutex {
4040
setHolder(true);
4141
}
4242

43-
void wait(const struct timeval &tv) {
44-
// Todo:
45-
// This logic is a bit weird, because normally we want to
46-
// sleep for a certain amount of time, but since we built
47-
// the stuff on pthreads and the API looked like it did we
48-
// used the absolute timers making us sleep to a certain
49-
// point in the future.. now we need to roll back that work
50-
// and do it again in the library API...
51-
// I believe we should rather try to modify our own code
52-
// to only do relative waits, and then have the native
53-
// calls do the either absolute or relative checks.
54-
//
55-
// There is no point of having an explicit return
56-
// value if it was a timeout or something else, because
57-
// you would have to evaluate the reason you waited anyway
58-
// (because one could have spurious wakeups etc)
59-
struct timeval now;
60-
gettimeofday(&now, NULL);
61-
62-
if (tv.tv_sec < now.tv_sec) {
63-
return ;
64-
}
65-
66-
uint64_t a = (now.tv_sec * 1000) + (now.tv_usec / 1000);
67-
uint64_t b = (tv.tv_sec * 1000) + (tv.tv_usec / 1000);
68-
69-
if (b < a) {
70-
// Already expired
71-
return ;
72-
}
73-
74-
cb_cond_timedwait(&cond, &mutex, (int)(b - a));
43+
void wait(const double secs) {
44+
cb_cond_timedwait(&cond, &mutex, (unsigned int)(secs * 1000.0));
7545
setHolder(true);
7646
}
7747

78-
void wait(const double secs) {
79-
cb_cond_timedwait(&cond, &mutex, (unsigned int)(secs * 1000.0));
48+
void wait(const hrtime_t nanoSecs) {
49+
cb_cond_timedwait(&cond, &mutex, (unsigned int)(nanoSecs/1000000));
8050
setHolder(true);
8151
}
8252

src/taskqueue.cc

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ void TaskQueue::_doWake_UNLOCKED(size_t &numToWake) {
6060
}
6161

6262
bool TaskQueue::_doSleep(ExecutorThread &t) {
63-
gettimeofday(&t.now, NULL);
64-
if (less_tv(t.now, t.waketime) && manager->trySleep(queueType)) {
63+
t.now = gethrtime();
64+
if (t.now < t.waketime && manager->trySleep(queueType)) {
6565
// Atomically switch from running to sleeping; iff we were previously
6666
// running.
6767
executor_state_t expected_state = EXECUTOR_RUNNING;
@@ -71,12 +71,12 @@ bool TaskQueue::_doSleep(ExecutorThread &t) {
7171
}
7272
sleepers++;
7373
// zzz....
74-
struct timeval waketime = t.now;
75-
advance_tv(waketime, MIN_SLEEP_TIME); // avoid sleeping more than this
76-
if (less_tv(waketime, t.waketime)) { // to prevent losing posts
77-
mutex.wait(waketime);
74+
hrtime_t snooze_nsecs = t.waketime - t.now;
75+
76+
if (snooze_nsecs > MIN_SLEEP_TIME * 1000000000) {
77+
mutex.wait(MIN_SLEEP_TIME);
7878
} else {
79-
mutex.wait(t.waketime);
79+
mutex.wait(snooze_nsecs);
8080
}
8181
// ... woke!
8282
sleepers--;
@@ -89,9 +89,9 @@ bool TaskQueue::_doSleep(ExecutorThread &t) {
8989
EXECUTOR_RUNNING)) {
9090
return false;
9191
}
92-
gettimeofday(&t.now, NULL);
92+
t.now = gethrtime();
9393
}
94-
set_max_tv(t.waketime);
94+
t.waketime = hrtime_t(-1);
9595
return true;
9696
}
9797

@@ -106,7 +106,7 @@ bool TaskQueue::_fetchNextTask(ExecutorThread &t, bool toSleep) {
106106
size_t numToWake = _moveReadyTasks(t.now);
107107

108108
if (!futureQueue.empty() && t.startIndex == queueType &&
109-
less_tv(futureQueue.top()->waketime, t.waketime)) {
109+
futureQueue.top()->waketime < t.waketime) {
110110
t.waketime = futureQueue.top()->waketime; // record earliest waketime
111111
}
112112

@@ -148,15 +148,15 @@ bool TaskQueue::fetchNextTask(ExecutorThread &thread, bool toSleep) {
148148
return rv;
149149
}
150150

151-
size_t TaskQueue::_moveReadyTasks(struct timeval tv) {
151+
size_t TaskQueue::_moveReadyTasks(hrtime_t tv) {
152152
if (!readyQueue.empty()) {
153153
return 0;
154154
}
155155

156156
size_t numReady = 0;
157157
while (!futureQueue.empty()) {
158158
ExTask tid = futureQueue.top();
159-
if (less_eq_tv(tid->waketime, tv)) {
159+
if (tid->waketime <= tv) {
160160
futureQueue.pop();
161161
readyQueue.push(tid);
162162
numReady++;
@@ -180,25 +180,25 @@ void TaskQueue::_checkPendingQueue(void) {
180180
}
181181
}
182182

183-
struct timeval TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
184-
struct timeval waktime;
183+
hrtime_t TaskQueue::_reschedule(ExTask &task, task_type_t &curTaskType) {
184+
hrtime_t wakeTime;
185185
manager->doneWork(curTaskType);
186186

187187
LockHolder lh(mutex);
188188

189189
futureQueue.push(task);
190190
if (curTaskType == queueType) {
191-
waktime = futureQueue.top()->waketime;
191+
wakeTime = futureQueue.top()->waketime;
192192
} else {
193-
set_max_tv(waktime);
193+
wakeTime = hrtime_t(-1);
194194
}
195195

196-
return waktime;
196+
return wakeTime;
197197
}
198198

199-
struct timeval TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
199+
hrtime_t TaskQueue::reschedule(ExTask &task, task_type_t &curTaskType) {
200200
EventuallyPersistentEngine *epe = ObjectRegistry::onSwitchThread(NULL, true);
201-
struct timeval rv = _reschedule(task, curTaskType);
201+
hrtime_t rv = _reschedule(task, curTaskType);
202202
ObjectRegistry::onSwitchThread(epe);
203203
return rv;
204204
}
@@ -227,9 +227,8 @@ void TaskQueue::schedule(ExTask &task) {
227227
}
228228

229229
void TaskQueue::_wake(ExTask &task) {
230-
struct timeval now;
231230
size_t numReady = 0;
232-
gettimeofday(&now, NULL);
231+
const hrtime_t now = gethrtime();
233232

234233
LockHolder lh(mutex);
235234
LOG(EXTENSION_LOG_DEBUG, "%s: Wake a task \"%s\" id %d", name.c_str(),
@@ -261,7 +260,7 @@ void TaskQueue::_wake(ExTask &task) {
261260

262261
while (!notReady.empty()) {
263262
ExTask tid = notReady.front();
264-
if (less_eq_tv(tid->waketime, now) || tid->isdead()) {
263+
if (tid->waketime <= now || tid->isdead()) {
265264
readyQueue.push(tid);
266265
numReady++;
267266
} else {

src/taskqueue.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class TaskQueue {
3535

3636
void schedule(ExTask &task);
3737

38-
struct timeval reschedule(ExTask &task, task_type_t &curTaskType);
38+
hrtime_t reschedule(ExTask &task, task_type_t &curTaskType);
3939

4040
void checkPendingQueue(void);
4141

@@ -53,13 +53,13 @@ class TaskQueue {
5353

5454
private:
5555
void _schedule(ExTask &task);
56-
struct timeval _reschedule(ExTask &task, task_type_t &curTaskType);
56+
hrtime_t _reschedule(ExTask &task, task_type_t &curTaskType);
5757
void _checkPendingQueue(void);
5858
bool _fetchNextTask(ExecutorThread &thread, bool toSleep);
5959
void _wake(ExTask &task);
6060
bool _doSleep(ExecutorThread &thread);
6161
void _doWake_UNLOCKED(size_t &numToWake);
62-
size_t _moveReadyTasks(struct timeval tv);
62+
size_t _moveReadyTasks(hrtime_t tv);
6363
ExTask _popReadyTask(void);
6464

6565
SyncObject mutex;

src/tasks.cc

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,14 @@ static const double WORKLOAD_MONITOR_FREQ(5.0);
2828
void GlobalTask::snooze(const double secs) {
2929
if (secs == INT_MAX) {
3030
setState(TASK_SNOOZED, TASK_RUNNING);
31-
set_max_tv(waketime);
31+
waketime = hrtime_t(-1);
3232
return;
3333
}
3434

35-
gettimeofday(&waketime, NULL);
36-
35+
waketime = gethrtime();
3736
if (secs) {
3837
setState(TASK_SNOOZED, TASK_RUNNING);
39-
advance_tv(waketime, secs);
38+
waketime += hrtime_t(secs * 1000000000);
4039
}
4140
}
4241

src/tasks.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,7 @@ friend class TaskQueue;
150150
bool blockShutdown;
151151
AtomicValue<task_state_t> state;
152152
const size_t taskId;
153-
struct timeval waketime;
153+
hrtime_t waketime; // used for priority_queue, guarded by TaskQ mutex
154154
EventuallyPersistentEngine *engine;
155155

156156
static AtomicValue<size_t> task_id_counter;
@@ -452,7 +452,7 @@ class CompareByPriority {
452452
class CompareByDueDate {
453453
public:
454454
bool operator()(ExTask &t1, ExTask &t2) {
455-
return less_tv(t2->waketime, t1->waketime);
455+
return t2->waketime < t1->waketime;
456456
}
457457
};
458458

0 commit comments

Comments
 (0)