Skip to content

Commit

Permalink
HPCC-29883 UNTESTED: Optimize DataBuffer allocator
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Jul 6, 2023
1 parent 93282ed commit f4094b9
Showing 1 changed file with 93 additions and 97 deletions.
190 changes: 93 additions & 97 deletions roxie/roxiemem/roxiemem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5035,6 +5035,7 @@ class CChunkingRowManager : public CRowManager
assertex(finger->mgr == this);
// MORE - if we get a load of data in and none out this can start to bog down...
DataBuffer *next = finger->nextActive;
__builtin_prefetch(next);
if (memTraceInconsistencies && getSeqFromDataId((memsize_t)next) != 0)
logctx.CTXLOG("INTERNAL ERROR: unexpected sequence number in nextActive in ~CChunkingRowManager - addr=%p rowMgr=%p",
finger, this);
Expand Down Expand Up @@ -6644,136 +6645,131 @@ class CDataBufferManager : implements IDataBufferManager, public CInterface

DataBuffer *allocate()
{
CriticalBlock b(crit);

if (memTraceLevel >= 5)
if (unlikely(memTraceLevel >= 5))
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() curBlock=%p nextAddr=%p:%x", curBlock, nextBase, nextOffset);

// This code be coded as freePending.exchange(false, std::memory_order_acquire))
// but that will be a locked operation.
// Since this is the only thread that clears the flag it is more efficient to check if it is set and clear if true.
if (freePending.load(std::memory_order_acquire))
DataBuffer * curFree;
{
freePending.store(false, std::memory_order_release);
freeUnused();
}
CriticalBlock b(crit);

for (;;)
{
if (curBlock)
// This code be coded as freePending.exchange(false, std::memory_order_acquire))
// but that will be a locked operation.
// Since this is the only thread that clears the flag it is more efficient to check if it is set and clear if true.
if (freePending.load(std::memory_order_acquire))
{
DataBufferBottom *bottom = curBlock;
// No need to hold the critical section on bottom->crit since there is a lock on the CDataBufferManager so nothing else can
// allocate anything from any of the blocks, and freeHeadId is lock free - so no need to lock for that
freePending.store(false, std::memory_order_release);
freeUnused();
}

memsize_t curHeadId = bottom->freeHeadId;
if (!isNullDataId(curHeadId))
for (;;)
{
if (likely(curBlock))
{
//If the pointer is non-null then it will remain non-null (other threads can only add items to the list, not remove them)
for (;;)
DataBufferBottom *bottom = curBlock;
// No need to hold the critical section on bottom->crit since there is a lock on the CDataBufferManager so nothing else can
// allocate anything from any of the blocks, and freeHeadId is lock free - so no need to lock for that

memsize_t curHeadId = bottom->freeHeadId;
if (!isNullDataId(curHeadId))
{
DataBuffer * curFree = getPtrFromDataId(curHeadId);
//If the pointer is non-null then it will remain non-null (other threads can only add items to the list, not remove them)
for (;;)
{
curFree = getPtrFromDataId(curHeadId);

//Nothing else can modify curFree, so this pointer will remain valid
DataBuffer * nextFree = getPtrFromDataId(curFree->nextDataId);
//Nothing else can modify curFree, so this pointer will remain valid
DataBuffer * nextFree = getPtrFromDataId(curFree->nextDataId);

//Always update the sequence number when updating the freeHeadId to avoid ABA problem.
memsize_t nextHeadId = createDataId(nextFree, nextSeqFromDataId(curHeadId));
if (likely(bottom->freeHeadId.compare_exchange_weak(curHeadId, nextHeadId, std::memory_order_acq_rel)))
{
dataBuffersActive.fetch_add(1);
bottom->Link();
curFree->nextDataId = 0;
curFree->changeState(DBState::freed, DBState::unowned, __func__); // sanity check - will be overriden by the following new
if (memTraceLevel >= 4)
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", curFree);
return ::new(curFree) DataBuffer();
//Always update the sequence number when updating the freeHeadId to avoid ABA problem.
memsize_t nextHeadId = createDataId(nextFree, nextSeqFromDataId(curHeadId));
if (likely(bottom->freeHeadId.compare_exchange_weak(curHeadId, nextHeadId, std::memory_order_acq_rel)))
{
bottom->Link();
//Exit the critical section ASAP
goto returnResult;
}
}
}
}

if (nextOffset < HEAP_ALIGNMENT_SIZE) // Is there any space in the current block (it must be a whole block)
{
dataBuffersActive.fetch_add(1);
curBlock->Link();
DataBuffer *x = ::new(nextBase+nextOffset) DataBuffer();
nextOffset += DATA_ALIGNMENT_SIZE;
if (nextOffset == HEAP_ALIGNMENT_SIZE)
if (nextOffset < HEAP_ALIGNMENT_SIZE) // Is there any space in the current block (it must be a whole block)
{
bottom->Link();
curFree = (DataBuffer *)(nextBase+nextOffset);
nextOffset += DATA_ALIGNMENT_SIZE;
goto returnNewResult;
}
else
{
// MORE: May want to delete this "if" logic !!
// and let it be handled in the similar logic of "else" part below.
curBlock->Release();
curBlock = NULL;
nextBase = NULL;
//nextOffset = 0 - not needed since only used if curBlock is set
}
if (memTraceLevel >= 4)
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() allocated DataBuffer - addr=%p", x);
return x;
}
else
// see if a previous block that is waiting to be freed has any on its free chain
DataBufferBottom *finger = freeChain;
if (finger)
{
curBlock->Release();
curBlock = NULL;
nextBase = NULL;
//nextOffset = 0 - not needed since only used if curBlock is set
}
}
// see if a previous block that is waiting to be freed has any on its free chain
DataBufferBottom *finger = freeChain;
if (finger)
{
for (;;)
{
memsize_t curHeadId = finger->freeHeadId;
if (!isNullDataId(curHeadId))
for (;;)
{
//If the pointer is non-null then it will remain non-null (items can only be added, not removed
//from the free list)
if (finger->isAliveAndLink())
memsize_t curHeadId = finger->freeHeadId;
if (!isNullDataId(curHeadId))
{
curBlock = finger;
nextBase = nullptr; // should never be accessed
nextOffset = HEAP_ALIGNMENT_SIZE; // only use the free chain to allocate
dataBuffersActive.fetch_add(1);

for (;;)
//If the pointer is non-null then it will remain non-null (items can only be added, not removed
//from the free list)
if (finger->isAliveAndLink())
{
DataBuffer * curFree = getPtrFromDataId(curHeadId);
assertex(curFree);

//Nothing else can modify curFree, so this pointer will remain valid
DataBuffer * nextFree = getPtrFromDataId(curFree->nextDataId);
curBlock = finger;
nextBase = nullptr; // should never be accessed
nextOffset = HEAP_ALIGNMENT_SIZE; // only use the free chain to allocate

//Always update the sequence number when updating the freeHeadId to avoid ABA problem
memsize_t nextHeadId = createDataId(nextFree, nextSeqFromDataId(curHeadId));
if (likely(finger->freeHeadId.compare_exchange_weak(curHeadId, nextHeadId, std::memory_order_acq_rel)))
for (;;)
{
dataBuffersActive.fetch_add(1);
finger->Link();
curFree->nextDataId = 0;
curFree->changeState(DBState::freed, DBState::unowned, __func__); // sanity check - will be overriden by the following new
if (memTraceLevel >= 4)
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", curFree);
return ::new(curFree) DataBuffer();
DataBuffer * curFree = getPtrFromDataId(curHeadId);
assertex(curFree);

//Nothing else can modify curFree, so this pointer will remain valid
DataBuffer * nextFree = getPtrFromDataId(curFree->nextDataId);

//Always update the sequence number when updating the freeHeadId to avoid ABA problem
memsize_t nextHeadId = createDataId(nextFree, nextSeqFromDataId(curHeadId));
if (likely(finger->freeHeadId.compare_exchange_weak(curHeadId, nextHeadId, std::memory_order_acq_rel)))
{
finger->Link();
goto returnResult;
}
}
}
}
finger = finger->nextBottom;
if (finger==freeChain)
break;
}
finger = finger->nextBottom;
if (finger==freeChain)
break;
}
nextBase = (char *)suballoc_aligned(1, false);
nextOffset = DATA_ALIGNMENT_SIZE;
curBlock = (DataBufferBottom *)nextBase;
dataBufferPages.fetch_add(1);
assertex(curBlock);
if (memTraceLevel >= 3)
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() allocated new DataBuffers Page - addr=%p", curBlock);
freeChain = ::new(curBlock) DataBufferBottom(this, freeChain); // we never allocate the lowest one in the heap - used just for refcounting the rest
}
nextBase = (char *)suballoc_aligned(1, false);
nextOffset = DATA_ALIGNMENT_SIZE;
curBlock = (DataBufferBottom *)nextBase;
dataBufferPages.fetch_add(1);
assertex(curBlock);
if (memTraceLevel >= 3)
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() allocated new DataBuffers Page - addr=%p", curBlock);
freeChain = ::new(curBlock) DataBufferBottom(this, freeChain); // we never allocate the lowest one in the heap - used just for refcounting the rest
}

returnResult:
dataBuffersActive.fetch_add(1);
curFree->nextDataId = 0;
curFree->changeState(DBState::freed, DBState::unowned, __func__); // sanity check - will be overriden by the following new
if (unlikely(memTraceLevel >= 4))
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() reallocated DataBuffer - addr=%p", curFree);
return ::new(curFree) DataBuffer();

returnNewResult:
dataBuffersActive.fetch_add(1);
if (unlikely(memTraceLevel >= 4))
DBGLOG("RoxieMemMgr: CDataBufferManager::allocate() allocated DataBuffer - addr=%p", curFree);
return ::new(curFree) DataBuffer();
}

void cleanUp()
Expand Down

0 comments on commit f4094b9

Please sign in to comment.