Skip to content

Commit

Permalink
HPCC-30110 Use a really simple queue to optimize parallel join
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Aug 16, 2023
1 parent 9e51f66 commit be81465
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
70 changes: 70 additions & 0 deletions system/jlib/jqueue.tpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,76 @@ public:
}
};

template <class BASE, bool ALLOWNULLS>
class ReallySimpleInterThreadQueueOf : protected SafeQueueOf<BASE, ALLOWNULLS>
{
typedef ReallySimpleInterThreadQueueOf<BASE, ALLOWNULLS> SELF;
typedef SafeQueueOf<BASE, ALLOWNULLS> PARENT;
protected:
Semaphore space;
Semaphore avail;
unsigned limit = 0;
std::atomic<bool> stopped{false};

public:
ReallySimpleInterThreadQueueOf<BASE, ALLOWNULLS>()
{
}

~ReallySimpleInterThreadQueueOf<BASE, ALLOWNULLS>()
{
stop();
}

void reset()
{
space.reinit(limit);
avail.reinit(0);
stopped = false;
}

bool enqueue(BASE *e)
{
space.wait();
if (stopped)
return false;
PARENT::enqueue(e);
avail.signal();
return true;
}

BASE *dequeue()
{
avail.wait();
if (stopped)
return nullptr;
BASE * result = PARENT::dequeue();
space.signal();
return result;
}

void setLimit(unsigned num)
{
limit = num;
space.reinit(limit);
}

void stop() // stops all waiting operations
{
//Assume maxreaders < limit, maxwriters < limit
stopped = true;
avail.signal(limit);
space.signal(limit);
}

BASE *dequeueNow()
{
return PARENT::dequeue();
}

using PARENT::ordinality;
};

#define ForEachQueueItemIn(x,y) unsigned numItems##x = (y).ordinality(); \
for (unsigned x = 0; x< numItems##x; ++x)

Expand Down
2 changes: 1 addition & 1 deletion thorlcr/activities/msort/thsortu.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1834,7 +1834,7 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase
multiWriter->abort();
}

SimpleInterThreadQueueOf<cWorkItem,false> workqueue;
ReallySimpleInterThreadQueueOf<cWorkItem,false> workqueue;
Owned<IRowMultiWriterReader> multiWriter;
Owned<IRowWriter> rowWriter;

Expand Down

0 comments on commit be81465

Please sign in to comment.