diff --git a/system/jlib/jqueue.tpp b/system/jlib/jqueue.tpp index 0063aee130b..44ed3a88115 100644 --- a/system/jlib/jqueue.tpp +++ b/system/jlib/jqueue.tpp @@ -242,6 +242,7 @@ template class SafeQueueOf : private QueueOf { typedef SafeQueueOf SELF; + typedef QueueOf PARENT; protected: mutable CriticalSection crit; inline void unsafeenqueue(BASE *e) { QueueOf::enqueue(e); } @@ -267,6 +268,7 @@ public: void dequeue(BASE *e) { CriticalBlock b(crit); return QueueOf::dequeue(e); } inline unsigned ordinality() const { return QueueOf::ordinality(); } void set(unsigned idx, BASE *e) { CriticalBlock b(crit); return QueueOf::set(idx, e); } + using PARENT::ensure; }; @@ -535,6 +537,79 @@ public: } }; +//A lighter-weight limited thread queue which does not allow timeouts. +//Linux futexes mean that semaphores now perform very well... +template +class ReallySimpleInterThreadQueueOf : protected SafeQueueOf +{ + typedef ReallySimpleInterThreadQueueOf SELF; + typedef SafeQueueOf PARENT; +protected: + Semaphore space; + Semaphore avail; + unsigned limit = 0; + std::atomic stopped{false}; + +public: + ReallySimpleInterThreadQueueOf() + { + } + + ~ReallySimpleInterThreadQueueOf() + { + stop(); + } + + void reset() + { + space.reinit(limit); + avail.reinit(0); + stopped = false; + } + + bool enqueue(BASE *e) + { + space.wait(); + if (stopped) + return false; + PARENT::enqueue(e); + avail.signal(); + return true; + } + + BASE *dequeue() + { + avail.wait(); + if (stopped) + return nullptr; + BASE * result = PARENT::dequeue(); + space.signal(); + return result; + } + + void setLimit(unsigned num) + { + limit = num; + space.reinit(limit); + PARENT::ensure(limit); + } + + void stop(unsigned maxReaders = 0, unsigned maxWriters = 0) // stops all waiting operations + { + //Assume maxreaders < limit, maxwriters < limit if not provided + stopped = true; + avail.signal(maxReaders ? maxReaders : limit); + space.signal(maxWriters ? maxWriters : limit); + } + + BASE *dequeueNow() + { + return PARENT::dequeue(); + } + + using PARENT::ordinality; +}; + #define ForEachQueueItemIn(x,y) unsigned numItems##x = (y).ordinality(); \ for (unsigned x = 0; x< numItems##x; ++x) diff --git a/thorlcr/activities/msort/thsortu.cpp b/thorlcr/activities/msort/thsortu.cpp index bd74a2fff47..540f351cb44 100644 --- a/thorlcr/activities/msort/thsortu.cpp +++ b/thorlcr/activities/msort/thsortu.cpp @@ -1834,7 +1834,7 @@ class CMultiCoreUnorderedJoinHelper: public CMultiCoreJoinHelperBase multiWriter->abort(); } - SimpleInterThreadQueueOf workqueue; + ReallySimpleInterThreadQueueOf workqueue; Owned multiWriter; Owned rowWriter;