Skip to content

Commit

Permalink
HPCC-30085 Add a max size threshold
Browse files Browse the repository at this point in the history
As well as max transactions, provide a max accumulated transaction
size threshold, primarily in case excessive large externals build up.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Aug 18, 2023
1 parent 5ce8c5e commit 40c5825
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 44 deletions.
75 changes: 40 additions & 35 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1102,21 +1102,23 @@ class CTransactionItem : public CSimpleInterface

static constexpr unsigned defaultSaveThresholdSecs = 0; // disabled
static constexpr unsigned defaultDeltaSaveTransactionThreshold = 0; // disabled
static constexpr unsigned defaultDeltaMemMaxMB = 10;
static constexpr unsigned defaultDeltaTransactionQueueLimit = 10000;
class CDeltaWriter : implements IThreaded
{
IStoreHelper *iStoreHelper = nullptr;
StringBuffer dataPath;
StringBuffer backupPath;
unsigned transactionWriteThreshold = 0;
unsigned transactionQueueLimit = defaultDeltaTransactionQueueLimit; // absolute limit, will block if this far behind
memsize_t transactionMaxMem = defaultDeltaMemMaxMB * 0x100000; // 10MB
unsigned totalQueueLimitHits = 0;
unsigned addQueueWaiting = 0;
unsigned saveThresholdSecs = 0;
cycle_t nextTimeThreshold = 0;
cycle_t thresholdDuration = 0;

std::queue<Owned<CTransactionItem>> pending;
memsize_t pendingSz = 0;
CriticalSection pendingCrit;
CCycleTimer timer;
StringBuffer deltaXml;
Expand Down Expand Up @@ -1364,35 +1366,31 @@ class CDeltaWriter : implements IThreaded
iStoreHelper->getBackupLocation(backupPath);

saveThresholdSecs = config.getPropInt("@deltaSaveThresholdSecs", defaultSaveThresholdSecs);
transactionWriteThreshold = config.getPropInt("@deltaSaveTransactionThreshold", defaultDeltaSaveTransactionThreshold);
transactionQueueLimit = config.getPropInt("@deltaTransactionQueueLimit", defaultDeltaTransactionQueueLimit);
unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB);
transactionMaxMem = (memsize_t)deltaTransactionMaxMemMB * 0x100000;
if (saveThresholdSecs)
{
thresholdDuration = queryOneSecCycles() * saveThresholdSecs;
nextTimeThreshold = get_cycles_now() + thresholdDuration;
}
if (0 == transactionQueueLimit) // treat 0 as disabled, meaning that the # pending transactions will not be considered
transactionQueueLimit = INFINITE;

aborted = false;
waiting = true;
if (0 == transactionWriteThreshold) // treat 0 as disabled, meaning that the # pending transactions will not be considered
transactionWriteThreshold = INFINITE;

if (transactionQueueLimit > 1) // else all transactions will be synchronous
threaded.init(this);

VStringBuffer msg("CDeltaWriter started - deltaTransactionQueueLimit=%u, deltaSaveThresholdSecs=", transactionQueueLimit);
if (saveThresholdSecs)
msg.append(saveThresholdSecs);
else
msg.append("<IMMEDIATE>");
msg.append(", transactionWriteThreshold=");
if (INFINITE == transactionWriteThreshold)
VStringBuffer msg("CDeltaWriter started - deltaSaveThresholdSecs=%u, deltaTransactionMaxMemMB=%u", saveThresholdSecs, deltaTransactionMaxMemMB);
msg.append(", deltaTransactionQueueLimit=");
if (INFINITE == transactionQueueLimit)
msg.append("<DISABLED>");
else if (1 == transactionWriteThreshold)
msg.append("<IMMEDIATE>");
else
msg.append(transactionWriteThreshold);
msg.append(transactionQueueLimit);
PROGLOG("%s", msg.str());

if ((transactionQueueLimit > 1) && (transactionMaxMem > 0))
threaded.init(this);
else
PROGLOG("All transactions will be committed synchronously");
}
void addDelta(const char *path, IPropertyTree *delta)
{
Expand Down Expand Up @@ -1441,23 +1439,24 @@ class CDeltaWriter : implements IThreaded
if (aborted)
break;
// keep going whilst there's things pending
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
while (true)
{
std::queue<Owned<CTransactionItem>> todo;
CLeavableCriticalBlock b(pendingCrit);
std::queue<Owned<CTransactionItem>> todo = std::move(pending);
if (0 == todo.size())
{
CriticalBlock b(pendingCrit);
todo = std::move(pending);
if (0 == todo.size())
{
// now in crit, check if sem timedout, but got signalled in between then and now, and consume possible signal
if (semTimedout)
sem.wait(0);
waiting = true;
break;
}
// now in crit, check if sem timedout, but got signalled in between then and now, and consume possible signal
if (semTimedout)
sem.wait(0);
waiting = true;
break;
}
if (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short)
pendingSz = 0;
// Hold blockedSaveCrit before releasing pendingCrit, because need to ensure this saves ahead
// of other transactions building up in addToQueue
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
b.leave();
while (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short)
MilliSleep(1000);
}
}
Expand Down Expand Up @@ -2207,25 +2206,31 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree)
void CDeltaWriter::addToQueue(CTransactionItem *item)
{
pending.push(item);
// add actual size for externals, and nonimal '100 byte' value for delta transactions
// it will act. as a rough guide to appoaching size threshold. It is not worth
// synchronously preparing and serializing here (which will be done asynchronously later)
pendingSz += (CTransactionItem::f_addext == item->type) ? item->dataLength : 100;
size_t items = pending.size();
if (items < transactionQueueLimit)
if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit))
{
if ((get_cycles_now() < nextTimeThreshold) && (pending.size() < transactionWriteThreshold))
if (nextTimeThreshold && (get_cycles_now() < nextTimeThreshold))
return;
if (waiting)
{
waiting = false;
sem.signal();
}
}
else
else // here if exceeded transationQueueLimit, transactionMaxMem or exceeded time threshold (deltaSaveThresholdSecs)
{
++totalQueueLimitHits;
// force a synchronous save
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
CCycleTimer timer;
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
PROGLOG("Forcing synchronous save of %u transactions", (unsigned)pending.size());
if (save(pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short)
{
pendingSz = 0;
timeThrottled += timer.elapsedCycles();
++throttleCounter;
if (timeThrottled >= queryOneSecCycles())
Expand Down
8 changes: 4 additions & 4 deletions initfiles/componentfiles/configschema/xsd/dali.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,12 @@
<xs:attribute name="deltaSaveThresholdSecs" type="xs:nonNegativeInteger"
hpcc:displayName="Seconds between transactions being committed to disk" hpcc:presetValue="0"
hpcc:tooltip="The maximum time between commit pending transactions to disk (default=0, meaning commit immediately)"/>
<xs:attribute name="deltaSaveTransactionThreshold" type="xs:nonNegativeInteger"
hpcc:displayName="Maximum number of pending transactions" hpcc:presetValue="0"
hpcc:tooltip="The maximum number of pending transactions before triggering a save to disk (default=INIFINITE, meaning commit immediately"/>
<xs:attribute name="deltaTransactionQueueLimit" type="xs:nonNegativeInteger"
hpcc:displayName="Maximum number of pending uncommited transaction" hpcc:presetValue="10000"
hpcc:tooltip="If the number of transactions exceeds this limit, all future transactions will be blocked, until they are committed to disk"/>
hpcc:tooltip="If exceeded, a synchronous save will be forced"/>
<xs:attribute name="deltaTransactionMaxMemMB" type="xs:nonNegativeInteger"
hpcc:displayName="Maximum total pending transaction memory size" hpcc:presetValue="10"
hpcc:tooltip="If exceeded, a synchronous save will be forced"/>
</xs:attributeGroup>
<xs:attributeGroup name="dfs" hpcc:groupByName="DFS" hpcc:docid="da.t5">
<xs:attribute name="forceGroupUpdate" type="xs:boolean" hpcc:displayName="Force Group Update"
Expand Down
8 changes: 4 additions & 4 deletions initfiles/componentfiles/configxml/dali.xsd
Original file line number Diff line number Diff line change
Expand Up @@ -281,17 +281,17 @@
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="deltaSaveTransactionThreshold" type="xs:nonNegativeInteger" use="optional" default="INFINITE">
<xs:attribute name="deltaTransactionQueueLimit" type="xs:nonNegativeInteger" use="optional" default="10000">
<xs:annotation>
<xs:appinfo>
<tooltip>Maximum number of transaction before triggering a commit to disk (default off, will commit immediately)</tooltip>
<tooltip>The max limit of pending uncommitted transactions. If exceeded a synchronous save will be forced</tooltip>
</xs:appinfo>
</xs:annotation>
</xs:attribute>
<xs:attribute name="deltaTransactionQueueLimit" type="xs:nonNegativeInteger" use="optional" default="10000">
<xs:attribute name="deltaTransactionMaxMemMB" type="xs:nonNegativeInteger" use="optional" default="10000">
<xs:annotation>
<xs:appinfo>
<tooltip>The max limit of pending uncommitted transactions. Transactions will be blocked until the number is reduced by committing to disk</tooltip>
<tooltip>The max total memory limit of pending uncommitted transactions. If exceeded a synchronous save will be forced</tooltip>
</xs:appinfo>
</xs:annotation>
</xs:attribute>
Expand Down
2 changes: 1 addition & 1 deletion initfiles/componentfiles/configxml/dali.xsl
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@
<xsl:element name="SDS">
<xsl:attribute name="store">dalisds.xml</xsl:attribute>
<xsl:attribute name="caseInsensitive">0</xsl:attribute>
<xsl:copy-of select="@nobackup | @recoverFromIncErrors | @snmpSendWarnings | @enableSNMP | @enableSysLog | @snmpErrorMsgLevel | @msgLevel | @lightweightCoalesce | @keepStores | @deltaSaveThresholdSecs | @deltaSaveTransactionThreshold | @deltaTransactionQueueLimit"/>
<xsl:copy-of select="@nobackup | @recoverFromIncErrors | @snmpSendWarnings | @enableSNMP | @enableSysLog | @snmpErrorMsgLevel | @msgLevel | @lightweightCoalesce | @keepStores | @deltaSaveThresholdSecs | @deltaTransactionQueueLimit | @deltaTransactionMaxMemMB"/>
<xsl:if test="string(@IdlePeriod) != ''">
<xsl:attribute name="lCIdlePeriod">
<xsl:value-of select="@IdlePeriod"/>
Expand Down

0 comments on commit 40c5825

Please sign in to comment.