Skip to content

Commit

Permalink
Replaced boost lockfree queue with moodycamel's concurrent queue
Browse files Browse the repository at this point in the history
  • Loading branch information
jarulraj committed May 13, 2016
1 parent 486eab2 commit 17bc149
Show file tree
Hide file tree
Showing 6 changed files with 5,119 additions and 34 deletions.
38 changes: 16 additions & 22 deletions src/backend/common/lockfree_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,13 @@

#pragma once

#include <boost/lockfree/queue.hpp>

#include <xmmintrin.h>
#include "concurrentqueue/blockingconcurrentqueue.h"

namespace peloton {

//===--------------------------------------------------------------------===//
// Lockfree Queue
// this is a wrapper of boost lockfree queue.
// this data structure supports multiple consumers and multiple producers.
// Supports multiple consumers and multiple producers.
//===--------------------------------------------------------------------===//

template <typename T>
Expand All @@ -32,32 +29,29 @@ class LockfreeQueue {
LockfreeQueue(const LockfreeQueue&) = delete; // disable copying
LockfreeQueue& operator=(const LockfreeQueue&) = delete; // disable assignment

// return true if pop is successful.
// if queue is empty, then return false.
bool TryPop(T& item) {
return queue_.pop(item);
// Enqueues one item, allocating extra space if necessary
void Enqueue(T& item) {
queue_.enqueue(item);
}

// return true if push is successful.
bool TryPush(const T& item) {
return queue_.push(item);
void Enqueue(const T& item) {
queue_.enqueue(item);
}


void BlockingPop(T& item) {
while (queue_.pop(item) == false) {
_mm_pause();
}
// Dequeues one item, returning true if an item was found
// or false if the queue appeared empty
bool Dequeue(T& item) {
return queue_.try_dequeue(item);
}

void BlockingPush(const T& item) {
while (queue_.push(item) == false) {
_mm_pause();
}
bool Dequeue(const T& item) {
return queue_.try_dequeue(item);
}

private:
boost::lockfree::queue<T> queue_;

// Underlying moodycamel's concurrent queue
moodycamel::BlockingConcurrentQueue<T> queue_;
};

} // namespace peloton
24 changes: 12 additions & 12 deletions src/backend/gc/gc_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ void GCManager::Running() {
for (size_t i = 0; i < MAX_ATTEMPT_COUNT; ++i) {
TupleMetadata tuple_metadata;
// if there's no more tuples in the queue, then break.
if (reclaim_queue_.TryPop(tuple_metadata) == false) {
if (reclaim_queue_.Dequeue(tuple_metadata) == false) {
break;
}

Expand All @@ -88,25 +88,25 @@ void GCManager::Running() {
if (recycle_queue_map_.find(tuple_metadata.table_id, recycle_queue) ==
true) {
// if the entry for tuple_metadata.table_id exists.
recycle_queue->BlockingPush(tuple_metadata);
recycle_queue->Enqueue(tuple_metadata);
} else {
// if the entry for tuple_metadata.table_id does not exist.
recycle_queue.reset(
new LockfreeQueue<TupleMetadata>(MAX_QUEUE_LENGTH));
bool ret =
recycle_queue_map_.insert(tuple_metadata.table_id, recycle_queue);
if (ret == true) {
recycle_queue->BlockingPush(tuple_metadata);
recycle_queue->Enqueue(tuple_metadata);
} else {
recycle_queue_map_.find(tuple_metadata.table_id, recycle_queue);
recycle_queue->BlockingPush(tuple_metadata);
recycle_queue->Enqueue(tuple_metadata);
}
}

tuple_counter++;
} else {
// if a tuple cannot be reclaimed, then add it back to the list.
reclaim_queue_.BlockingPush(tuple_metadata);
reclaim_queue_.Enqueue(tuple_metadata);
}
} // end for

Expand All @@ -133,7 +133,7 @@ void GCManager::RecycleTupleSlot(const oid_t &table_id,
tuple_metadata.tuple_slot_id = tuple_id;
tuple_metadata.tuple_end_cid = tuple_end_cid;

reclaim_queue_.BlockingPush(tuple_metadata);
reclaim_queue_.Enqueue(tuple_metadata);

LOG_INFO("Marked tuple(%u, %u) in table %u as possible garbage",
tuple_metadata.tile_group_id, tuple_metadata.tuple_slot_id,
Expand All @@ -151,7 +151,7 @@ ItemPointer GCManager::ReturnFreeSlot(const oid_t &table_id) {
// if there exists recycle_queue
if (recycle_queue_map_.find(table_id, recycle_queue) == true) {
TupleMetadata tuple_metadata;
if (recycle_queue->TryPop(tuple_metadata) == true) {
if (recycle_queue->Dequeue(tuple_metadata) == true) {
LOG_INFO("Reuse tuple(%u, %u) in table %u", tuple_metadata.tile_group_id,
tuple_metadata.tuple_slot_id, table_id);
return ItemPointer(tuple_metadata.tile_group_id,
Expand All @@ -167,7 +167,7 @@ ItemPointer GCManager::ReturnFreeSlot(const oid_t &table_id) {
void GCManager::ClearGarbage() {
// iterate reclaim queue and reclaim every thing because it's the end of the world now.
TupleMetadata tuple_metadata;
while (reclaim_queue_.TryPop(tuple_metadata) == true) {
while (reclaim_queue_.Dequeue(tuple_metadata) == true) {
ResetTuple(tuple_metadata);

// Add to the recycle map
Expand All @@ -176,22 +176,22 @@ void GCManager::ClearGarbage() {
if (recycle_queue_map_.find(tuple_metadata.table_id, recycle_queue) ==
true) {
// if the entry for tuple_metadata.table_id exists.
recycle_queue->BlockingPush(tuple_metadata);
recycle_queue->Enqueue(tuple_metadata);
} else {
// if the entry for tuple_metadata.table_id does not exist.
recycle_queue.reset(
new LockfreeQueue<TupleMetadata>(MAX_QUEUE_LENGTH));
bool ret =
recycle_queue_map_.insert(tuple_metadata.table_id, recycle_queue);
if (ret == true) {
recycle_queue->BlockingPush(tuple_metadata);
recycle_queue->Enqueue(tuple_metadata);
} else {
recycle_queue_map_.find(tuple_metadata.table_id, recycle_queue);
recycle_queue->BlockingPush(tuple_metadata);
recycle_queue->Enqueue(tuple_metadata);
}
}
}
}

} // namespace gc
} // namespace peloton
} // namespace peloton
31 changes: 31 additions & 0 deletions third_party/concurrentqueue/LICENSE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
This license applies to everything in this repository except that which
is explicitly annotated as being written by other authors, i.e. the Boost
queue (included in the benchmarks for comparison), Intel's TBB library (ditto),
the CDSChecker tool (used for verification), the Relacy model checker (ditto),
and Jeff Preshing's semaphore implementation (used in the blocking queue) which
has a zlib license (embedded in blockingconcurrentqueue.h).


Simplified BSD License:

Copyright (c) 2013-2016, Cameron Desrochers.
All rights reserved.

Redistribution and use in source and binary forms, with or without modification,
are permitted provided that the following conditions are met:

- Redistributions of source code must retain the above copyright notice, this list of
conditions and the following disclaimer.
- Redistributions in binary form must reproduce the above copyright notice, this list of
conditions and the following disclaimer in the documentation and/or other materials
provided with the distribution.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY
EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL
THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT
OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR
TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE,
EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
Loading

0 comments on commit 17bc149

Please sign in to comment.