From 70d928c53c6b5898204ad6176e3fa640f9406d79 Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 31 Aug 2023 15:44:28 +0100 Subject: [PATCH] HPCC-30085 2nd review changes + Write xml transactions after externals written, pending external removals at end + Ensure save/stop flushes transactions Signed-off-by: Jake Smith --- dali/base/dasds.cpp | 105 ++++++++++++++++++++++++++++++-------------- 1 file changed, 72 insertions(+), 33 deletions(-) diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index e8b29e32d00..03ac49ed451 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -1128,7 +1128,9 @@ class CDeltaWriter : implements IThreaded unsigned throttleCounter = 0; bool waiting = true; bool backupOutOfSync = false; - bool aborted = false; + std::atomic aborted = false; + bool signalEmpty = false; + Semaphore emptySem; void validateDeltaBackup() { @@ -1306,6 +1308,7 @@ class CDeltaWriter : implements IThreaded catch (IException *e) { EXCLOG(e, NULL); e->Release(); } } + std::vector pendingExtDeletes; while (!todo.empty()) { CTransactionItem *item = todo.front(); @@ -1315,44 +1318,54 @@ class CDeltaWriter : implements IThreaded item->deltaTree = nullptr; cleanChangeTree(*changeTree); - // write out with header details (e.g. path) - Owned header = createPTree("Header"); - header->setProp("@path", item->name); - IPropertyTree *delta = header->addPropTree("Delta", createPTree()); - const char *nodeName = changeTree->queryName(); - delta->addPropTree(nodeName, changeTree.getClear()); - toXML(header, deltaXml); + // write out with header details (i.e. path) + deltaXml.appendf("
\n \n", item->name); + toXML(changeTree, deltaXml, 4); + deltaXml.append(" \n
"); } else { // external file - // commit accumulated delta 1st (so write order is consistent) - if (deltaXml.length()) - writeXml(); // also handles backup if needed - if (CTransactionItem::f_addext == item->type) { + // NB: the XML referring to these externals will not be written until the pending xml is built up + // meaning if there's a cold restart at this point, there will be unreferenced ext files on disk (and ensuing warnings) + // However, that is better than committing the xml that references externals, before they are exist in the event of + // a cold restart between the two. + // NB2: It is also important to ensure that these externals are committed to disk, in the event of a Dali saveStore + // because the saved in-memory store has been altered at this point, i.e. it depends on the external file. writeExt(dataPath, item->name, item->dataLength, item->data); if (backupPath.length()) writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30); } else { + // track externals to delete / don't delete until xml is committed. See below. dbgassertex(CTransactionItem::f_delext == item->type); - deleteExt(dataPath, item->name); - if (backupPath.length()) - deleteExt(backupPath, item->name, 60, 30); + pendingExtDeletes.push_back(item->name); } } todo.pop(); } if (deltaXml.length()) writeXml(); + for (auto &ext : pendingExtDeletes) + { + deleteExt(dataPath, ext.c_str()); + if (backupPath.length()) + deleteExt(backupPath, ext.c_str(), 60, 30); + } if (thresholdDuration) lastSaveTime = get_cycles_now(); return true; } + void signalWaiting() + { + // must be called whilst pendingCrit is held + waiting == true + waiting = false; + sem.signal(); + } void addToQueue(CTransactionItem *item); public: CDeltaWriter() : threaded("CDeltaWriter") @@ -1369,7 +1382,10 @@ class CDeltaWriter : implements IThreaded unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB); transactionMaxMem = (memsize_t)deltaTransactionMaxMemMB * 0x100000; if (saveThresholdSecs) + { thresholdDuration = queryOneSecCycles() * saveThresholdSecs; + lastSaveTime = get_cycles_now(); + } if (0 == transactionQueueLimit) // treat 0 as disabled, meaning that the # pending transactions will not be considered transactionQueueLimit = INFINITE; @@ -1407,17 +1423,36 @@ class CDeltaWriter : implements IThreaded CriticalBlock b(pendingCrit); addToQueue(item); } - void clear() + void flush() { - CriticalBlock b(pendingCrit); - pending = {}; + bool waitOnEmptySem = false; + { + CriticalBlock b(pendingCrit); + if (waiting) + { + if (pendingSz) + signalWaiting(); + } + if (!waiting) + { + signalEmpty = true; + waitOnEmptySem = true; + } + } + if (waitOnEmptySem) + { + // this should not be here long, but log just in case + while (!emptySem.wait(10000)) + WARNLOG("Waiting on CDeltaWriter to flush transactions"); + } } - void stop() + void abort() { if (!aborted) { aborted = true; sem.signal(); + emptySem.signal(); threaded.join(); } } @@ -1445,6 +1480,11 @@ class CDeltaWriter : implements IThreaded if (semTimedout) sem.wait(0); waiting = true; + if (signalEmpty) + { + signalEmpty = false; + emptySem.signal(); + } break; } pendingSz = 0; @@ -1515,7 +1555,7 @@ class CLegacyBinaryFileExternal : public CExternalFile, implements IExternalHand Owned iFile = createIFile(filename.str()); size32_t sz = (size32_t)iFile->size(); - if ((unsigned)-1 == sz) + if ((size32_t)-1 == sz) { StringBuffer s("Missing external file "); if (*_name) @@ -2120,7 +2160,7 @@ void CBinaryFileExternal::readValue(const char *name, MemoryBuffer &mb) getFilename(filename, name); Owned iFile = createIFile(filename.str()); size32_t sz = (size32_t)iFile->size(); - if ((unsigned)-1 == sz) + if ((size32_t)-1 == sz) { StringBuffer s("Missing external file "); Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); @@ -2148,18 +2188,19 @@ void CBinaryFileExternal::read(const char *name, IPropertyTree &owner, MemoryBuf byte flags = ((PTree &)owner).queryFlags(); mb.append(flags); serializeVisibleAttributes(owner, mb); - StringBuffer extName; - getName(extName, name); - if (manager.extCache.lookup(extName, mb)) - return; if (withValue) { + StringBuffer extName; + getName(extName, name); + if (manager.extCache.lookup(extName, mb)) + return; + StringBuffer filename; getFilename(filename, name); Owned iFile = createIFile(filename.str()); size32_t sz = (size32_t)iFile->size(); - if ((unsigned)-1 == sz) + if ((size32_t)-1 == sz) { IptFlagClr(flags, ipt_binary); mb.writeDirect(flagsPos, sizeof(flags), &flags); @@ -2213,18 +2254,15 @@ void CDeltaWriter::addToQueue(CTransactionItem *item) return; if (waiting) - { - waiting = false; - sem.signal(); - } + signalWaiting(); } else // here if exceeded transationQueueLimit, transactionMaxMem or exceeded time threshold (deltaSaveThresholdSecs) { ++totalQueueLimitHits; // force a synchronous save CCycleTimer timer; - CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending PROGLOG("Forcing synchronous save of %u transactions", (unsigned)pending.size()); + CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending if (save(pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short) { pendingSz = 0; @@ -6532,6 +6570,9 @@ void CCovenSDSManager::saveStore(const char *storeName, bool currentEdition) CIgnore() { SDSManager->ignoreExternals=true; } ~CIgnore() { SDSManager->ignoreExternals=false; } } ignore; + // lock blockingaveCrit after flush, since deltaWriter itself blocks on blockedSaveCrit + deltaWriter.flush(); // transactions will be blocked at this stage, no new deltas will be added to writer + CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); iStoreHelper->saveStore(root, NULL, currentEdition); unsigned initNodeTableSize = allNodes.maxElements()+OVERFLOWSIZE; queryCoven().setInitSDSNodes(initNodeTableSize>INIT_NODETABLE_SIZE?initNodeTableSize:INIT_NODETABLE_SIZE); @@ -8139,11 +8180,9 @@ MemoryBuffer &CCovenSDSManager::collectSubscribers(MemoryBuffer &out) void CCovenSDSManager::blockingSave(unsigned *writeTransactions) { CHECKEDDALIREADLOCKBLOCK(SDSManager->dataRWLock, readWriteTimeout); // block all write actions whilst saving - CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); if (writeTransactions) *writeTransactions = SDSManager->writeTransactions; // JCS - could in theory, not block, but abort save. - deltaWriter.clear(); SDSManager->saveStore(); }