From 40c58251dbb2a8b5b1d792de76b36d432f00f005 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Fri, 18 Aug 2023 11:22:12 +0100 Subject: [PATCH] HPCC-30085 Add a max size threshold As well as max transactions, provide a max accumulated transaction size threshold, primarily in case excessive large externals build up. Signed-off-by: Jake Smith --- dali/base/dasds.cpp | 75 ++++++++++--------- .../componentfiles/configschema/xsd/dali.xsd | 8 +- initfiles/componentfiles/configxml/dali.xsd | 8 +- initfiles/componentfiles/configxml/dali.xsl | 2 +- 4 files changed, 49 insertions(+), 44 deletions(-) diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index a75d9c5d307..c60c770aca6 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -1102,14 +1102,15 @@ class CTransactionItem : public CSimpleInterface static constexpr unsigned defaultSaveThresholdSecs = 0; // disabled static constexpr unsigned defaultDeltaSaveTransactionThreshold = 0; // disabled +static constexpr unsigned defaultDeltaMemMaxMB = 10; static constexpr unsigned defaultDeltaTransactionQueueLimit = 10000; class CDeltaWriter : implements IThreaded { IStoreHelper *iStoreHelper = nullptr; StringBuffer dataPath; StringBuffer backupPath; - unsigned transactionWriteThreshold = 0; unsigned transactionQueueLimit = defaultDeltaTransactionQueueLimit; // absolute limit, will block if this far behind + memsize_t transactionMaxMem = defaultDeltaMemMaxMB * 0x100000; // 10MB unsigned totalQueueLimitHits = 0; unsigned addQueueWaiting = 0; unsigned saveThresholdSecs = 0; @@ -1117,6 +1118,7 @@ class CDeltaWriter : implements IThreaded cycle_t thresholdDuration = 0; std::queue> pending; + memsize_t pendingSz = 0; CriticalSection pendingCrit; CCycleTimer timer; StringBuffer deltaXml; @@ -1364,35 +1366,31 @@ class CDeltaWriter : implements IThreaded iStoreHelper->getBackupLocation(backupPath); saveThresholdSecs = config.getPropInt("@deltaSaveThresholdSecs", defaultSaveThresholdSecs); - transactionWriteThreshold = config.getPropInt("@deltaSaveTransactionThreshold", defaultDeltaSaveTransactionThreshold); transactionQueueLimit = config.getPropInt("@deltaTransactionQueueLimit", defaultDeltaTransactionQueueLimit); + unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB); + transactionMaxMem = (memsize_t)deltaTransactionMaxMemMB * 0x100000; if (saveThresholdSecs) { thresholdDuration = queryOneSecCycles() * saveThresholdSecs; nextTimeThreshold = get_cycles_now() + thresholdDuration; } + if (0 == transactionQueueLimit) // treat 0 as disabled, meaning that the # pending transactions will not be considered + transactionQueueLimit = INFINITE; aborted = false; waiting = true; - if (0 == transactionWriteThreshold) // treat 0 as disabled, meaning that the # pending transactions will not be considered - transactionWriteThreshold = INFINITE; - - if (transactionQueueLimit > 1) // else all transactions will be synchronous - threaded.init(this); - - VStringBuffer msg("CDeltaWriter started - deltaTransactionQueueLimit=%u, deltaSaveThresholdSecs=", transactionQueueLimit); - if (saveThresholdSecs) - msg.append(saveThresholdSecs); - else - msg.append(""); - msg.append(", transactionWriteThreshold="); - if (INFINITE == transactionWriteThreshold) + VStringBuffer msg("CDeltaWriter started - deltaSaveThresholdSecs=%u, deltaTransactionMaxMemMB=%u", saveThresholdSecs, deltaTransactionMaxMemMB); + msg.append(", deltaTransactionQueueLimit="); + if (INFINITE == transactionQueueLimit) msg.append(""); - else if (1 == transactionWriteThreshold) - msg.append(""); else - msg.append(transactionWriteThreshold); + msg.append(transactionQueueLimit); PROGLOG("%s", msg.str()); + + if ((transactionQueueLimit > 1) && (transactionMaxMem > 0)) + threaded.init(this); + else + PROGLOG("All transactions will be committed synchronously"); } void addDelta(const char *path, IPropertyTree *delta) { @@ -1441,23 +1439,24 @@ class CDeltaWriter : implements IThreaded if (aborted) break; // keep going whilst there's things pending - CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending while (true) { - std::queue> todo; + CLeavableCriticalBlock b(pendingCrit); + std::queue> todo = std::move(pending); + if (0 == todo.size()) { - CriticalBlock b(pendingCrit); - todo = std::move(pending); - if (0 == todo.size()) - { - // now in crit, check if sem timedout, but got signalled in between then and now, and consume possible signal - if (semTimedout) - sem.wait(0); - waiting = true; - break; - } + // now in crit, check if sem timedout, but got signalled in between then and now, and consume possible signal + if (semTimedout) + sem.wait(0); + waiting = true; + break; } - if (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short) + pendingSz = 0; + // Hold blockedSaveCrit before releasing pendingCrit, because need to ensure this saves ahead + // of other transactions building up in addToQueue + CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending + b.leave(); + while (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short) MilliSleep(1000); } } @@ -2207,10 +2206,14 @@ void CBinaryFileExternal::write(const char *name, IPropertyTree &tree) void CDeltaWriter::addToQueue(CTransactionItem *item) { pending.push(item); + // add actual size for externals, and nonimal '100 byte' value for delta transactions + // it will act. as a rough guide to appoaching size threshold. It is not worth + // synchronously preparing and serializing here (which will be done asynchronously later) + pendingSz += (CTransactionItem::f_addext == item->type) ? item->dataLength : 100; size_t items = pending.size(); - if (items < transactionQueueLimit) + if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit)) { - if ((get_cycles_now() < nextTimeThreshold) && (pending.size() < transactionWriteThreshold)) + if (nextTimeThreshold && (get_cycles_now() < nextTimeThreshold)) return; if (waiting) { @@ -2218,14 +2221,16 @@ void CDeltaWriter::addToQueue(CTransactionItem *item) sem.signal(); } } - else + else // here if exceeded transationQueueLimit, transactionMaxMem or exceeded time threshold (deltaSaveThresholdSecs) { ++totalQueueLimitHits; // force a synchronous save - CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending CCycleTimer timer; + CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending + PROGLOG("Forcing synchronous save of %u transactions", (unsigned)pending.size()); if (save(pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short) { + pendingSz = 0; timeThrottled += timer.elapsedCycles(); ++throttleCounter; if (timeThrottled >= queryOneSecCycles()) diff --git a/initfiles/componentfiles/configschema/xsd/dali.xsd b/initfiles/componentfiles/configschema/xsd/dali.xsd index 58ee4658899..4e5fe0f667c 100644 --- a/initfiles/componentfiles/configschema/xsd/dali.xsd +++ b/initfiles/componentfiles/configschema/xsd/dali.xsd @@ -64,12 +64,12 @@ - + hpcc:tooltip="If exceeded, a synchronous save will be forced"/> + - + - Maximum number of transaction before triggering a commit to disk (default off, will commit immediately) + The max limit of pending uncommitted transactions. If exceeded a synchronous save will be forced - + - The max limit of pending uncommitted transactions. Transactions will be blocked until the number is reduced by committing to disk + The max total memory limit of pending uncommitted transactions. If exceeded a synchronous save will be forced diff --git a/initfiles/componentfiles/configxml/dali.xsl b/initfiles/componentfiles/configxml/dali.xsl index 16d13940839..a7a4516a181 100644 --- a/initfiles/componentfiles/configxml/dali.xsl +++ b/initfiles/componentfiles/configxml/dali.xsl @@ -140,7 +140,7 @@ dalisds.xml 0 - +