diff --git a/dali/base/dasds.cpp b/dali/base/dasds.cpp index 56722d7c0f4..a75d9c5d307 100644 --- a/dali/base/dasds.cpp +++ b/dali/base/dasds.cpp @@ -15,6 +15,10 @@ limitations under the License. ############################################################################## */ +#include +#include +#include + #include "platform.h" #include "jhash.hpp" #include "jlib.hpp" @@ -94,7 +98,9 @@ static auto pSdsRequestsPending = hpccMetrics::registerGaugeFromCountersMetric(" #define SUBNTFY_POOL_SIZE 400 #define SUBSCAN_POOL_SIZE 100 #define RTM_INTERNAL 0x80000000 // marker for internal connection (performed within a transaction) -#define DEFAULT_EXTERNAL_SIZE_THRESHOLD (10*1024) + +static constexpr memsize_t defaultExternalSizeThreshold = 10 * 1024; // 10k +static constexpr memsize_t defaultExtCacheSizeMB = 10; #define NOTIFY_ATTR "@sds:notify" #define FETCH_ENTIRE -1 @@ -948,6 +954,29 @@ void serializeVisibleAttributes(IPropertyTree &tree, MemoryBuffer &mb) mb.append(""); // attribute terminator. i.e. blank attr name. } +bool compareFiles(IFile *file1, IFile *file2, bool compareTimes=true) +{ + if (file1->exists()) + { + if (file2->exists()) + { + if (file1->size() == file2->size()) + { + if (!compareTimes) return true; + CDateTime modifiedTimeBackup; + file1->getTime(NULL, &modifiedTimeBackup, NULL); + CDateTime modifiedTime; + file2->getTime(NULL, &modifiedTime, NULL); + if (0 == modifiedTimeBackup.compare(modifiedTime, false)) + return true; + } + } + } + else + return !file2->exists(); + return false; +} + void writeDelta(StringBuffer &xml, IFile &iFile, const char *msg="", unsigned retrySecs=0, unsigned retryAttempts=10) { Owned exception; @@ -1015,72 +1044,159 @@ void writeDelta(StringBuffer &xml, IFile &iFile, const char *msg="", unsigned re } } -struct BackupQueueItem +void cleanChangeTree(IPropertyTree &tree) { - static unsigned typeMask; - enum flagt { f_delta=0x1, f_addext=0x2, f_delext=0x3, f_first=0x10 }; - BackupQueueItem() : edition((unsigned)-1), flags(0) { text = new StringBuffer; dataLength = 0; data = NULL; } - ~BackupQueueItem() + tree.removeProp("@id"); + Owned iter = tree.getElements(RENAME_TAG); + ForEach (*iter) + iter->query().removeProp("@id"); + iter.setown(tree.getElements(DELETE_TAG)); + ForEach (*iter) + iter->query().removeProp("@id"); + iter.setown(tree.getElements(RESERVED_CHANGE_NODE)); + ForEach (*iter) + cleanChangeTree(iter->query()); +} + +class CTransactionItem : public CSimpleInterface +{ +public: + enum flagt : byte { f_none, f_delta, f_addext, f_delext } type = f_none; + char *name; + union + { + IPropertyTree *deltaTree; + struct + { + void *data; + unsigned dataLength; + }; + }; + + CTransactionItem(const char *path, IPropertyTree *_deltaTree) : deltaTree(_deltaTree) { - delete text; - if (data) free(data); + type = f_delta; + name = strdup(path); + } + CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name), dataLength(_dataLength), data(_data) + { + type = data ? f_addext : f_delext; + } + ~CTransactionItem() + { + free(name); + switch (type) + { + case f_delta: + ::Release(deltaTree); + break; + case f_addext: + free(data); + break; + case f_delext: + default: + break; + } } - StringBuffer *text; - unsigned edition; - unsigned dataLength; - void *data; - byte flags; }; -unsigned BackupQueueItem::typeMask = 0x0f; -class CBackupHandler : public CInterface, implements IThreaded -{ - typedef QueueOf BackupQueue; - CThreaded threaded; - BackupQueue itemQueue, freeQueue; - Semaphore pending, softQueueLimitSem; - bool aborted, waiting, addWaiting, async; - unsigned currentEdition, throttleCounter; - CriticalSection queueCrit, freeQueueCrit; - StringAttr backupPath; - unsigned freeQueueLimit; // how many BackupQueueItems to cache for reuse - unsigned largeWarningThreshold; // point at which to start warning about large queue - unsigned softQueueLimit; // threshold over which primary transactions will be delay by small delay, to allow backup catchup. - unsigned softQueueLimitDelay; // delay for above - CTimeMon warningTime; - unsigned recentTimeThrottled; - unsigned lastNumWarnItems; - IPropertyTree &config; - const unsigned defaultFreeQueueLimit = 50; - const unsigned defaultLargeWarningThreshold = 50; - const unsigned defaultSoftQueueLimit = 200; - const unsigned defaultSoftQueueLimitDelay = 200; +static constexpr unsigned defaultSaveThresholdSecs = 0; // disabled +static constexpr unsigned defaultDeltaSaveTransactionThreshold = 0; // disabled +static constexpr unsigned defaultDeltaTransactionQueueLimit = 10000; +class CDeltaWriter : implements IThreaded +{ + IStoreHelper *iStoreHelper = nullptr; + StringBuffer dataPath; + StringBuffer backupPath; + unsigned transactionWriteThreshold = 0; + unsigned transactionQueueLimit = defaultDeltaTransactionQueueLimit; // absolute limit, will block if this far behind + unsigned totalQueueLimitHits = 0; + unsigned addQueueWaiting = 0; + unsigned saveThresholdSecs = 0; + cycle_t nextTimeThreshold = 0; + cycle_t thresholdDuration = 0; + + std::queue> pending; + CriticalSection pendingCrit; + CCycleTimer timer; + StringBuffer deltaXml; + CThreaded threaded; + Semaphore sem; + cycle_t timeThrottled = 0; + unsigned throttleCounter = 0; + bool waiting = true; + bool backupOutOfSync = false; + bool aborted = false; - BackupQueueItem *getFreeItem() + void validateDeltaBackup() { - BackupQueueItem *item; + // check consistency of delta + StringBuffer deltaFilename(dataPath); + iStoreHelper->getCurrentDeltaFilename(deltaFilename); + OwnedIFile iFileDelta = createIFile(deltaFilename.str()); + deltaFilename.clear().append(backupPath); + iStoreHelper->getCurrentDeltaFilename(deltaFilename); + OwnedIFile iFileDeltaBackup = createIFile(deltaFilename.str()); + if (!compareFiles(iFileDeltaBackup, iFileDelta, false)) { - CriticalBlock b(freeQueueCrit); - item = freeQueue.dequeue(); + OWARNLOG("Delta file backup doesn't exist or differs, filename=%s", deltaFilename.str()); + copyFile(iFileDeltaBackup, iFileDelta); } - if (!item) - item = new BackupQueueItem; - return item; } - void clearQueue(BackupQueue &queue) + void writeXml() { - for (;;) + try { - BackupQueueItem *item = queue.dequeue(); - if (!item) break; - delete item; + StringBuffer deltaFilename(dataPath); + iStoreHelper->getCurrentDeltaFilename(deltaFilename); + OwnedIFile iFile = createIFile(deltaFilename.str()); + writeDelta(deltaXml, *iFile); } + catch (IException *e) + { + // NB: writeDelta retries a few times before giving up. + VStringBuffer errMsg("writeXml: failed to save delta data, blockedDelta size=%d", deltaXml.length()); + OWARNLOG(e, errMsg.str()); + e->Release(); + return; + } + if (backupPath.length()) + { + try + { + if (backupOutOfSync) // true if there was previously an exception during synchronously writing delta to backup. + { + OWARNLOG("Backup delta is out of sync due to a prior backup write error, attempting to resync"); + // catchup - check and copy primary delta to backup + validateDeltaBackup(); + backupOutOfSync = false; + OWARNLOG("Backup delta resynchronized"); + } + else + { + StringBuffer deltaFilename(backupPath); + constructStoreName(DELTANAME, iStoreHelper->queryCurrentEdition(), deltaFilename); + OwnedIFile iFile = createIFile(deltaFilename.str()); + ::writeDelta(deltaXml, *iFile, "backup - ", 60, 30); + } + } + catch (IException *e) + { + OERRLOG(e, "writeXml: failed to save backup delta data"); + e->Release(); + backupOutOfSync = true; + } + } + if (deltaXml.length() > (0x100000 * 10)) // >= 10MB + deltaXml.kill(); + else + deltaXml.clear(); } - void writeExt(const char *name, const unsigned length, const void *data, unsigned retrySecs=0, unsigned retryAttempts=10) + void writeExt(const char *basePath, const char *name, const unsigned length, const void *data, unsigned retrySecs=0, unsigned retryAttempts=10) { Owned exception; unsigned _retryAttempts = retryAttempts; - StringBuffer rL(remoteBackupLocation); + StringBuffer rL(basePath); for (;;) { try @@ -1110,11 +1226,11 @@ class CBackupHandler : public CInterface, implements IThreaded MilliSleep(retrySecs*1000); } } - void deleteExt(const char *name, unsigned retrySecs=0, unsigned retryAttempts=10) + void deleteExt(const char *basePath, const char *name, unsigned retrySecs=0, unsigned retryAttempts=10) { Owned exception; unsigned _retryAttempts = retryAttempts; - StringBuffer rL(remoteBackupLocation); + StringBuffer rL(basePath); for (;;) { try @@ -1143,251 +1259,219 @@ class CBackupHandler : public CInterface, implements IThreaded MilliSleep(retrySecs*1000); } } - bool writeDelta(StringBuffer &xml, unsigned edition, bool first) + bool save(std::queue> &todo) { - StringBuffer deltaFilename(backupPath); - constructStoreName(DELTANAME, edition, deltaFilename); - OwnedIFile iFile = createIFile(deltaFilename.str()); - if (!first && !iFile->exists()) - return false; // discard - ::writeDelta(xml, *iFile, "CBackupHandler - ", 60, 30); - return true; - } - void clearOld() - { - CriticalBlock b(queueCrit); - for (;;) + StringBuffer fname(dataPath); + OwnedIFile deltaIPIFile = createIFile(fname.append(DELTAINPROGRESS).str()); + OwnedIFileIO deltaIPIFileIO = deltaIPIFile->open(IFOcreate); + deltaIPIFileIO.clear(); + struct RemoveDIPBlock { - BackupQueueItem *item = itemQueue.dequeue(); - if (!item) break; - if (BackupQueueItem::f_delta == (item->flags & BackupQueueItem::typeMask)) + IFile &iFile; + bool done; + void doit() { done = true; iFile.remove(); } + RemoveDIPBlock(IFile &_iFile) : iFile(_iFile), done(false) { } + ~RemoveDIPBlock () { if (!done) doit(); } + } removeDIP(*deltaIPIFile); + StringBuffer detachIPStr(dataPath); + OwnedIFile detachIPIFile = createIFile(detachIPStr.append(DETACHINPROGRESS).str()); + if (detachIPIFile->exists()) // very small window where this can happen. + { + // implies other operation about to access current delta + // CHECK session is really alive, otherwise it has been orphaned, so remove it. + try { - item->text->clear(); - if (freeQueue.ordinality() < freeQueueLimit) - freeQueue.enqueue(item); + SessionId sessId = 0; + OwnedIFileIO detachIPIO = detachIPIFile->open(IFOread); + if (detachIPIO) + { + size_t s = detachIPIO->read(0, sizeof(sessId), &sessId); + detachIPIO.clear(); + if (sizeof(sessId) == s) + { + // double check session is really alive + if (querySessionManager().sessionStopped(sessId, 0)) + detachIPIFile->remove(); + else + { + // *cannot block* because other op (sasha) accessing remote dali files, can access dali. + removeDIP.doit(); + PROGLOG("blocked"); + return false; + } + } + } + } + catch (IException *e) { EXCLOG(e, NULL); e->Release(); } + } + + while (!todo.empty()) + { + CTransactionItem *item = todo.front(); + if (CTransactionItem::f_delta == item->type) + { + Owned changeTree = item->deltaTree; + item->deltaTree = nullptr; + cleanChangeTree(*changeTree); + + // write out with header details (e.g. path) + Owned header = createPTree("Header"); + header->setProp("@path", item->name); + IPropertyTree *delta = header->addPropTree("Delta", createPTree()); + const char *nodeName = changeTree->queryName(); + delta->addPropTree(nodeName, changeTree.getClear()); + toXML(header, deltaXml); + } + else + { + // external file + + // commit accumulated delta 1st (so write order is consistent) + if (deltaXml.length()) + writeXml(); // also handles backup if needed + + if (CTransactionItem::f_addext == item->type) + { + writeExt(dataPath, item->name, item->dataLength, item->data); + if (backupPath.length()) + writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30); + } else - delete item; + { + dbgassertex(CTransactionItem::f_delext == item->type); + deleteExt(dataPath, item->name); + if (backupPath.length()) + deleteExt(backupPath, item->name, 60, 30); + } } + todo.pop(); } - if (addWaiting && itemQueue.ordinality()getPrimaryLocation(dataPath); + iStoreHelper->getBackupLocation(backupPath); + + saveThresholdSecs = config.getPropInt("@deltaSaveThresholdSecs", defaultSaveThresholdSecs); + transactionWriteThreshold = config.getPropInt("@deltaSaveTransactionThreshold", defaultDeltaSaveTransactionThreshold); + transactionQueueLimit = config.getPropInt("@deltaTransactionQueueLimit", defaultDeltaTransactionQueueLimit); + if (saveThresholdSecs) { - aborted = true; - pending.signal(); - threaded.join(); + thresholdDuration = queryOneSecCycles() * saveThresholdSecs; + nextTimeThreshold = get_cycles_now() + thresholdDuration; } + + aborted = false; + waiting = true; + if (0 == transactionWriteThreshold) // treat 0 as disabled, meaning that the # pending transactions will not be considered + transactionWriteThreshold = INFINITE; + + if (transactionQueueLimit > 1) // else all transactions will be synchronous + threaded.init(this); + + VStringBuffer msg("CDeltaWriter started - deltaTransactionQueueLimit=%u, deltaSaveThresholdSecs=", transactionQueueLimit); + if (saveThresholdSecs) + msg.append(saveThresholdSecs); + else + msg.append(""); + msg.append(", transactionWriteThreshold="); + if (INFINITE == transactionWriteThreshold) + msg.append(""); + else if (1 == transactionWriteThreshold) + msg.append(""); + else + msg.append(transactionWriteThreshold); + PROGLOG("%s", msg.str()); } - void removeExt(const char *fname) + void addDelta(const char *path, IPropertyTree *delta) { - if (aborted) return; - if (!async) - { - deleteExt(fname); - return; - } - BackupQueueItem *item = getFreeItem(); - item->text->append(fname); - item->flags = BackupQueueItem::f_delext; - add(item); + CriticalBlock b(pendingCrit); + addToQueue(new CTransactionItem(path, delta)); } - void addExt(const char *fname, unsigned length, void *data) + CTransactionItem *addExt(char *name, size32_t length, void *content) { - if (aborted) return; - if (!async) - { - writeExt(fname, length, data); - free(data); - return; - } - BackupQueueItem *item = getFreeItem(); - item->text->append(fname); - item->dataLength = length; - item->data = data; // take ownership - item->flags = BackupQueueItem::f_addext; - add(item); + CriticalBlock b(pendingCrit); + CTransactionItem *item = new CTransactionItem(name, length, content); + addToQueue(LINK(item)); + return item; } - void addDelta(StringBuffer &xml, unsigned edition, bool first) + void removeExt(char *name) { - if (aborted) return; - if (!async) - { - writeDelta(xml, edition, first); - if (xml.length() > 0x100000) - xml.kill(); - else - xml.clear(); - return; - } - if (edition != currentEdition) - { - clearOld(); - currentEdition = edition; - } - BackupQueueItem *item = getFreeItem(); - xml.swapWith(*item->text); - item->edition = edition; - item->flags = BackupQueueItem::f_delta | BackupQueueItem::f_first; - add(item); + CriticalBlock b(pendingCrit); + addToQueue(new CTransactionItem(name, 0, nullptr)); } - void add(BackupQueueItem *item) + void clear() { - CriticalBlock b(queueCrit); - itemQueue.enqueue(item); - unsigned items=itemQueue.ordinality(); - if (0==items%largeWarningThreshold) - { - if (items>lastNumWarnItems) // track as they go up - { - IWARNLOG("Backup thread has a high # (%d) of pending transaction queued to write", items); - lastNumWarnItems = items; - } - else if (warningTime.elapsed() >= 60000) // if falling, avoid logging too much - { - IWARNLOG("Backup thread has a high # (%d) of pending transaction queued to write", items); - lastNumWarnItems = 0; - warningTime.reset(0); - } - } - if (items>=softQueueLimit) - { - addWaiting = true; - unsigned ms = msTick(); - { - CriticalUnblock b(queueCrit); - softQueueLimitSem.wait(softQueueLimitDelay); - } - addWaiting = false; - recentTimeThrottled += (msTick()-ms); // reset when queue < largeWarningThreshold - if (recentTimeThrottled >= softQueueLimitDelay && (0 == throttleCounter % 10)) // softQueueLimit exceeded - log every 10 transactions if recentTimeThrottled >= softQueueLimitDelay (1 unsignalled delay) - IWARNLOG("Primary transactions are being delayed by lagging backup, currently %d queued, recent total throttle delay=%d", items, recentTimeThrottled); - ++throttleCounter; // also reset when queue < largeWarningThreshold - } - if (waiting) - pending.signal(); + CriticalBlock b(pendingCrit); + pending = {}; } - bool doIt(BackupQueueItem &item) + void stop() { - try - { - switch (item.flags & BackupQueueItem::typeMask) - { - case BackupQueueItem::f_delta: - return writeDelta(*item.text, item.edition, 0 != (item.flags & BackupQueueItem::f_first)); - case BackupQueueItem::f_addext: - writeExt(item.text->str(), item.dataLength, item.data, 60, 30); - return true; - case BackupQueueItem::f_delext: - deleteExt(item.text->str(), 60, 30); - return true; - } - } - catch (IException *e) + if (!aborted) { - IERRLOG(e, "BackupHandler(async) write operation failed, possible backup data loss"); - e->Release(); + aborted = true; + sem.signal(); + pendingCrit.enter(); + pendingCrit.leave(); + threaded.join(); } - return false; } // IThreaded virtual void threadmain() override { - for (;;) + while (!aborted) { - BackupQueueItem *item=NULL; - do + bool semTimedout = false; + if (saveThresholdSecs) + semTimedout = !sem.wait(saveThresholdSecs * 1000); + else + sem.wait(); + + if (aborted) + break; + // keep going whilst there's things pending + CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending + while (true) { - CriticalBlock b(queueCrit); - if (itemQueue.ordinality()) - { - item = itemQueue.dequeue(); - if (addWaiting && itemQueue.ordinality()> todo; { - waiting = true; + CriticalBlock b(pendingCrit); + todo = std::move(pending); + if (0 == todo.size()) { - CriticalUnblock b(queueCrit); - if (!aborted) - pending.wait(); + // now in crit, check if sem timedout, but got signalled in between then and now, and consume possible signal + if (semTimedout) + sem.wait(0); + waiting = true; + break; } - waiting = false; - } - if (aborted) - { - if (item) delete item; - PROGLOG("BackupHandler stopped"); - return; } + if (!save(todo)) // if temporarily blocked, wait a bit (blocking window is short) + MilliSleep(1000); } - while (!item); - if (!doIt(*item)) - clearOld(); - CriticalBlock b(freeQueueCrit); - if (freeQueue.ordinality() < freeQueueLimit) - { - if (item->text->length() > 0x100000) - item->text->kill(); - else - item->text->clear(); - if (item->data) - { - free(item->data); - item->data = NULL; - item->dataLength = 0; - } - freeQueue.enqueue(item); - } - else - delete item; } } }; + class CExternalFile : public CInterface { - StringAttr ext, dataPath; protected: - CBackupHandler &backupHandler; + CCovenSDSManager &manager; + StringAttr ext, dataPath; public: - CExternalFile(const char *_ext, const char *_dataPath, CBackupHandler &_backupHandler) : ext(_ext), dataPath(_dataPath), backupHandler(_backupHandler) { } + CExternalFile(const char *_ext, const char *_dataPath, CCovenSDSManager &_manager) : ext(_ext), dataPath(_dataPath), manager(_manager) { } const char *queryExt() { return ext; } StringBuffer &getName(StringBuffer &fName, const char *base) { @@ -1400,162 +1484,27 @@ class CExternalFile : public CInterface bool isValid(const char *name) { StringBuffer filename; - getFilename(filename, name); - Owned iFile = createIFile(filename.str()); - return iFile->exists(); - } - void remove(const char *name) - { - StringBuffer filename; - getFilename(filename, name); - Owned iFile = createIFile(filename.str()); - iFile->remove(); - if (remoteBackupLocation.length()) - { - StringBuffer fname(name); - backupHandler.removeExt(fname.append(queryExt()).str()); - } - } -}; - -class CLegacyBinaryFileExternal : public CExternalFile, implements IExternalHandler -{ -public: - IMPLEMENT_IINTERFACE; - - CLegacyBinaryFileExternal(const char *dataPath, CBackupHandler &backupHandler) : CExternalFile("." EF_LegacyBinaryValue, dataPath, backupHandler) { } - virtual void resetAsExternal(IPropertyTree &tree) - { - tree.setProp(NULL, (char *)NULL); - } - virtual void readValue(const char *name, MemoryBuffer &mb) - { - StringBuffer filename; - getFilename(filename, name); - - Owned iFile = createIFile(filename.str()); - size32_t sz = (size32_t)iFile->size(); - if ((unsigned)-1 == sz) - { - StringBuffer s("Missing external file "); - Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); - LOG(MCoperatorWarning, unknownJob, e, s.str()); - StringBuffer str("EXTERNAL BINARY FILE: \""); - str.append(filename.str()).append("\" MISSING"); - CPTValue v(str.length()+1, str.str(), false); - v.serialize(mb); - } - else - { - Owned fileIO = iFile->open(IFOread); - MemoryBuffer vmb; - verifyex(sz == ::read(fileIO, 0, sz, vmb)); - CPTValue v(sz, vmb.toByteArray(), true); - v.serialize(mb); - } - } - virtual void read(const char *name, IPropertyTree &owner, MemoryBuffer &mb, bool withValue) - { - StringBuffer filename; - getFilename(filename, name); - - const char *_name = owner.queryName(); - if (!_name) _name = ""; - mb.append(_name); - byte flags = ((PTree &)owner).queryFlags(); - mb.append(IptFlagSet(flags, ipt_binary)); - - serializeVisibleAttributes(owner, mb); - - Owned iFile = createIFile(filename.str()); - size32_t sz = (size32_t)iFile->size(); - if ((unsigned)-1 == sz) - { - StringBuffer s("Missing external file "); - if (*_name) - s.append("in property ").append(_name); - Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); - LOG(MCoperatorWarning, unknownJob, e, s.str()); - if (withValue) - { - StringBuffer str("EXTERNAL BINARY FILE: \""); - str.append(filename.str()).append("\" MISSING"); - CPTValue v(str.length()+1, str.str(), false); - v.serialize(mb); - } - else - mb.append((size32_t)0); - } - else - { - if (withValue) - { - MemoryBuffer vmb; - Owned fileIO = iFile->open(IFOread); - verifyex(sz == ::read(fileIO, 0, sz, vmb)); - CPTValue v(sz, vmb.toByteArray(), true); - v.serialize(mb); - } - else - mb.append((size32_t)0); - } - } - virtual void write(const char *name, IPropertyTree &tree) - { - StringBuffer filename; - getFilename(filename, name); - Owned iFile = createIFile(filename.str()); - Owned fileIO = iFile->open(IFOcreate); - - MemoryBuffer out; - ((PTree &)tree).queryValue()->serialize(out); - const char *data = out.toByteArray(); - unsigned length = out.length(); - fileIO->write(0, length, data); - if (remoteBackupLocation.length()) - { - StringBuffer fname(name); - backupHandler.addExt(fname.append(queryExt()).str(), length, out.detach()); - } - } - virtual void remove(const char *name) { CExternalFile::remove(name); } - virtual bool isValid(const char *name) { return CExternalFile::isValid(name); } - virtual StringBuffer &getName(StringBuffer &fName, const char *base) { return CExternalFile::getName(fName, base); } - virtual StringBuffer &getFilename(StringBuffer &fName, const char *base) { return CExternalFile::getFilename(fName, base); } -}; - -class CBinaryFileExternal : public CExternalFile, implements IExternalHandler -{ -public: - IMPLEMENT_IINTERFACE; - - CBinaryFileExternal(const char *dataPath, CBackupHandler &backupHandler) : CExternalFile("." EF_BinaryValue, dataPath, backupHandler) { } - virtual void resetAsExternal(IPropertyTree &tree) - { - tree.setProp(NULL, (char *)NULL); - } - virtual void readValue(const char *name, MemoryBuffer &mb) - { - StringBuffer filename; - getFilename(filename, name); - - Owned iFile = createIFile(filename.str()); - size32_t sz = (size32_t)iFile->size(); - if ((unsigned)-1 == sz) - { - StringBuffer s("Missing external file "); - Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); - LOG(MCoperatorWarning, unknownJob, e, s.str()); - StringBuffer str("EXTERNAL BINARY FILE: \""); - str.append(filename.str()).append("\" MISSING"); - CPTValue v(str.length()+1, str.str(), false); - v.serialize(mb); - } - else - { - Owned fileIO = iFile->open(IFOread); - verifyex(sz == ::read(fileIO, 0, sz, mb)); - } + getFilename(filename, name); + Owned iFile = createIFile(filename.str()); + return iFile->exists(); + } + void remove(const char *name); +}; + +// deprecated, only used to convert if found on load +class CLegacyBinaryFileExternal : public CExternalFile, implements IExternalHandler +{ +public: + IMPLEMENT_IINTERFACE; + + CLegacyBinaryFileExternal(const char *dataPath, CCovenSDSManager &manager) : CExternalFile("." EF_LegacyBinaryValue, dataPath, manager) { } + virtual void resetAsExternal(IPropertyTree &tree) + { + throwUnexpected(); + } + virtual void readValue(const char *name, MemoryBuffer &mb) + { + throwUnexpected(); } virtual void read(const char *name, IPropertyTree &owner, MemoryBuffer &mb, bool withValue) { @@ -1564,15 +1513,15 @@ class CBinaryFileExternal : public CExternalFile, implements IExternalHandler const char *_name = owner.queryName(); if (!_name) _name = ""; mb.append(_name); + byte flags = ((PTree &)owner).queryFlags(); + mb.append(IptFlagSet(flags, ipt_binary)); + + serializeVisibleAttributes(owner, mb); Owned iFile = createIFile(filename.str()); size32_t sz = (size32_t)iFile->size(); if ((unsigned)-1 == sz) { - byte flags = ((PTree &)owner).queryFlags(); - IptFlagClr(flags, ipt_binary); - mb.append(flags); - serializeVisibleAttributes(owner, mb); StringBuffer s("Missing external file "); if (*_name) s.append("in property ").append(_name); @@ -1590,13 +1539,13 @@ class CBinaryFileExternal : public CExternalFile, implements IExternalHandler } else { - byte flags = ((PTree &)owner).queryFlags(); - mb.append(flags); - serializeVisibleAttributes(owner, mb); if (withValue) { + MemoryBuffer vmb; Owned fileIO = iFile->open(IFOread); - verifyex(sz == ::read(fileIO, 0, sz, mb)); + verifyex(sz == ::read(fileIO, 0, sz, vmb)); + CPTValue v(sz, vmb.toByteArray(), true); + v.serialize(mb); } else mb.append((size32_t)0); @@ -1604,64 +1553,47 @@ class CBinaryFileExternal : public CExternalFile, implements IExternalHandler } virtual void write(const char *name, IPropertyTree &tree) { - StringBuffer filename; - getFilename(filename, name); - Owned iFile = createIFile(filename.str()); - Owned fileIO = iFile->open(IFOcreate); + throwUnexpected(); + } + virtual void remove(const char *name) { CExternalFile::remove(name); } + virtual bool isValid(const char *name) { return CExternalFile::isValid(name); } + virtual StringBuffer &getName(StringBuffer &fName, const char *base) { return CExternalFile::getName(fName, base); } + virtual StringBuffer &getFilename(StringBuffer &fName, const char *base) { return CExternalFile::getFilename(fName, base); } +}; - MemoryBuffer out; - ((PTree &)tree).queryValue()->serialize(out); - const char *data = out.toByteArray(); - unsigned length = out.length(); - fileIO->write(0, length, data); - if (remoteBackupLocation.length()) - { - StringBuffer fname(name); - backupHandler.addExt(fname.append(queryExt()).str(), length, out.detach()); - } +class CBinaryFileExternal : public CExternalFile, implements IExternalHandler +{ +public: + IMPLEMENT_IINTERFACE; + + CBinaryFileExternal(const char *dataPath, CCovenSDSManager &manager) : CExternalFile("." EF_BinaryValue, dataPath, manager) { } + virtual void resetAsExternal(IPropertyTree &tree) + { + tree.setProp(NULL, (char *)NULL); } + virtual void readValue(const char *name, MemoryBuffer &mb); + virtual void read(const char *name, IPropertyTree &owner, MemoryBuffer &mb, bool withValue); + virtual void write(const char *name, IPropertyTree &tree); virtual void remove(const char *name) { CExternalFile::remove(name); } virtual bool isValid(const char *name) { return CExternalFile::isValid(name); } virtual StringBuffer &getName(StringBuffer &fName, const char *base) { return CExternalFile::getName(fName, base); } virtual StringBuffer &getFilename(StringBuffer &fName, const char *base) { return CExternalFile::getFilename(fName, base); } }; +// deprecated, only used to convert if found on load class CXMLFileExternal : public CExternalFile, implements IExternalHandler { public: IMPLEMENT_IINTERFACE; - CXMLFileExternal(const char *dataPath, CBackupHandler &backupHandler) : CExternalFile("." EF_XML, dataPath, backupHandler) { } + CXMLFileExternal(const char *dataPath, CCovenSDSManager &manager) : CExternalFile("." EF_XML, dataPath, manager) { } virtual void resetAsExternal(IPropertyTree &_tree) { - PTree &tree = *QUERYINTERFACE(&_tree, PTree); - ::Release(tree.detach()); + throwUnexpected(); } virtual void readValue(const char *name, MemoryBuffer &mb) { - StringBuffer filename; - getFilename(filename, name); - OwnedIFile ifile = createIFile(filename.str()); - if (!ifile->exists()) - { - StringBuffer s("Missing external file "); - Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); - LOG(MCoperatorWarning, unknownJob, e, s.str()); - StringBuffer str("EXTERNAL XML FILE: \""); - str.append(filename.str()).append("\" MISSING"); - CPTValue v(str.length()+1, str.str(), false); - v.serialize(mb); - } - else - { - Owned tree; - tree.setown(createPTreeFromXMLFile(filename.str())); - IPTArrayValue *v = ((PTree *)tree.get())->queryValue(); - if (v) - v->serialize(mb); - else - mb.append((size32_t)0); - } + throwUnexpected(); } virtual void read(const char *name, IPropertyTree &owner, MemoryBuffer &mb, bool withValue) { @@ -1693,20 +1625,7 @@ class CXMLFileExternal : public CExternalFile, implements IExternalHandler } virtual void write(const char *name, IPropertyTree &tree) { - StringBuffer filename; - getFilename(filename, name); - Owned iFile = createIFile(filename.str()); - Owned fileIO = iFile->open(IFOcreate); - Owned fstream = createBufferedIOStream(fileIO); - toXML(&tree, *fstream); - if (remoteBackupLocation.length()) - { - StringBuffer fname(name); - StringBuffer str; - toXML(&tree, str); - unsigned l = str.length(); - backupHandler.addExt(fname.append(queryExt()).str(), l, str.detach()); - } + throwUnexpected(); } virtual void remove(const char *name) { CExternalFile::remove(name); } virtual bool isValid(const char *name) { return CExternalFile::isValid(name); } @@ -1919,6 +1838,94 @@ interface INodeSubscriptionManager : extends ISubscriptionManager virtual MemoryBuffer &collectSubscribers(MemoryBuffer &out) const = 0; }; +////////////////////// + + +class CExtCache +{ + CriticalSection crit; + std::list> order; + std::unordered_map>::iterator> extTable; + memsize_t cachedSz = 0; + memsize_t cacheSzLimit = 0; + + void purge() + { + for (auto it = order.begin(); it != order.end();) + { + CTransactionItem *item = *it; + cachedSz -= item->dataLength; + extTable.erase(item->name); + it = order.erase(it); + if (cachedSz <= cacheSzLimit) + break; + } + } + void doAdd(CTransactionItem *item) + { + CriticalBlock b(crit); + auto it = extTable.find(item->name); + if (it != extTable.end()) + { + Linked &existingItem = *(it->second); + cachedSz -= existingItem->dataLength; + existingItem.set(item); + order.splice(order.end(), order, it->second); // move to front of FIFO list + } + else + { + auto listIt = order.insert(order.end(), item); + extTable[item->name] = listIt; + } + cachedSz += item->dataLength; + if (cachedSz > cacheSzLimit) + purge(); + } +public: + void init(memsize_t _cacheSzLimit) + { + cacheSzLimit = _cacheSzLimit; + } + void add(const char *name, size_t sz, const void *data) // will clone data + { + if (sz > cacheSzLimit) + return; + MemoryAttr ma; + ma.set(sz, data); + CTransactionItem *item = new CTransactionItem(strdup(name), sz, ma.detach()); + doAdd(item); + } + void add(CTransactionItem *item) + { + if (item->dataLength > cacheSzLimit) + return; + doAdd(item); + } + void remove(const char *key) + { + CriticalBlock b(crit); + auto mapIt = extTable.find(key); + if (mapIt != extTable.end()) + { + auto listIter = mapIt->second; + assertex(listIter != order.erase(listIter)); + assertex(mapIt != extTable.erase(mapIt)); + } + } + bool lookup(const char *key, MemoryBuffer &mb) + { + CLeavableCriticalBlock b(crit); + auto it = extTable.find(key); + if (it == extTable.end()) + return false; + Linked item = *(it->second); + b.leave(); + mb.append(item->dataLength, item->data); + return true; + } +}; + + ////////////////////// enum LockStatus { LockFailed, LockHeld, LockTimedOut, LockSucceeded }; @@ -1972,7 +1979,7 @@ class CCovenSDSManager : public CSDSManagerBase, implements ISDSManagerServer, i CServerRemoteTree *queryRegisteredTree(__int64 uniqId); CServerRemoteTree *getRegisteredTree(__int64 uniqId); CServerRemoteTree *queryRoot(); - void saveDelta(const char *path, IPropertyTree &changeTree); + void serializeDelta(const char *path, IPropertyTree *changeTree); CSubscriberContainerList *getSubscribers(const char *xpath, CPTStack &stack); void getExternalValue(__int64 index, MemoryBuffer &mb); IPropertyTree *getXPathsSortLimitMatchTree(const char *baseXPath, const char *matchXPath, const char *sortby, bool caseinsensitive, bool ascending, unsigned from, unsigned limit); @@ -2083,8 +2090,12 @@ class CCovenSDSManager : public CSDSManagerBase, implements ISDSManagerServer, i IStoreHelper *iStoreHelper; bool doTimeComparison; StringBuffer blockedDelta; - CBackupHandler backupHandler; + CDeltaWriter deltaWriter; + CExtCache extCache; bool backupOutOfSync = false; + +friend class CExternalFile; +friend class CBinaryFileExternal; }; ISDSManagerServer &querySDSServer() @@ -2095,6 +2106,139 @@ ISDSManagerServer &querySDSServer() ///////////////// +void CExternalFile::remove(const char *name) +{ + StringBuffer filename(name); + filename.append(ext); + manager.extCache.remove(filename); + manager.deltaWriter.removeExt(filename.detach()); +} + +void CBinaryFileExternal::readValue(const char *name, MemoryBuffer &mb) +{ + StringBuffer extName; + getName(extName, name); + if (manager.extCache.lookup(extName, mb)) + return; + + StringBuffer filename; + getFilename(filename, name); + Owned iFile = createIFile(filename.str()); + size32_t sz = (size32_t)iFile->size(); + if ((unsigned)-1 == sz) + { + StringBuffer s("Missing external file "); + Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); + LOG(MCoperatorWarning, unknownJob, e, s.str()); + StringBuffer str("EXTERNAL BINARY FILE: \""); + str.append(filename.str()).append("\" MISSING"); + CPTValue v(str.length()+1, str.str(), false); + v.serialize(mb); + } + else + { + Owned fileIO = iFile->open(IFOread); + verifyex(sz == ::read(fileIO, 0, sz, mb)); + manager.extCache.add(extName, sz, mb.toByteArray()); + } +} + +void CBinaryFileExternal::read(const char *name, IPropertyTree &owner, MemoryBuffer &mb, bool withValue) +{ + const char *_name = owner.queryName(); + if (!_name) _name = ""; + mb.append(_name); + + size32_t flagsPos = mb.length(); + byte flags = ((PTree &)owner).queryFlags(); + mb.append(flags); + serializeVisibleAttributes(owner, mb); + StringBuffer extName; + getName(extName, name); + if (manager.extCache.lookup(extName, mb)) + return; + + if (withValue) + { + StringBuffer filename; + getFilename(filename, name); + Owned iFile = createIFile(filename.str()); + size32_t sz = (size32_t)iFile->size(); + if ((unsigned)-1 == sz) + { + IptFlagClr(flags, ipt_binary); + mb.writeDirect(flagsPos, sizeof(flags), &flags); + + StringBuffer s("Missing external file "); + if (*_name) + s.append("in property ").append(_name); + Owned e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str()); + LOG(MCoperatorWarning, unknownJob, e, s.str()); + StringBuffer str("EXTERNAL BINARY FILE: \""); + str.append(filename.str()).append("\" MISSING"); + CPTValue v(str.length()+1, str.str(), false); + v.serialize(mb); + } + else + { + Owned fileIO = iFile->open(IFOread); + verifyex(sz == ::read(fileIO, 0, sz, mb)); + manager.extCache.add(extName, sz, mb.toByteArray()); + } + } + else + mb.append((size32_t)0); +} + +void CBinaryFileExternal::write(const char *name, IPropertyTree &tree) +{ + StringBuffer filename(name); + filename.append(ext); + + MemoryBuffer out; + ((PTree &)tree).queryValue()->serialize(out); + size32_t len = out.length(); + Owned item = manager.deltaWriter.addExt(filename.detach(), len, out.detach()); + manager.extCache.add(item); +} + +///////////////// + +void CDeltaWriter::addToQueue(CTransactionItem *item) +{ + pending.push(item); + size_t items = pending.size(); + if (items < transactionQueueLimit) + { + if ((get_cycles_now() < nextTimeThreshold) && (pending.size() < transactionWriteThreshold)) + return; + if (waiting) + { + waiting = false; + sem.signal(); + } + } + else + { + ++totalQueueLimitHits; + // force a synchronous save + CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending + CCycleTimer timer; + if (save(pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short) + { + timeThrottled += timer.elapsedCycles(); + ++throttleCounter; + if (timeThrottled >= queryOneSecCycles()) + { + IWARNLOG("Transactions throttled - current items = %u, since last message throttled-time/tracactions = { %u ms, %u }, total hard limit hits = %u", (unsigned)items, (unsigned)cycle_to_millisec(timeThrottled), throttleCounter, totalQueueLimitHits); + timeThrottled = 0; + throttleCounter = 0; + } + } + } +} +///////////////// + void CConnectionSubscriptionManager::add(ISubscription *sub, SubscriptionId id) { MemoryBuffer mb; @@ -2669,6 +2813,7 @@ class CServerRemoteTree : public CRemoteTreeBase PDState processData(IPropertyTree &changeTree, Owned &parentBranchChange, MemoryBuffer &newIds); PDState checkChange(IPropertyTree &tree, CBranchChange &parentBranchChange); friend class COrphanHandler; +friend class CDeltaWriter; }; class CNodeSubscriberContainer : public CSubscriberContainerBase @@ -4407,7 +4552,7 @@ void CSDSTransactionServer::processMessage(CMessageBuffer &mb) CheckTime block6("DAMP_SDSCMD_DATA.6"); StringBuffer path; connection->queryPTreePath().getAbsolutePath(path); - manager.saveDelta(path.str(), *changeTree); + manager.serializeDelta(path.str(), changeTree.getClear()); } mb.clear(); mb.append((int)DAMP_SDSREPLY_OK); @@ -5248,7 +5393,7 @@ class CStoreHelper : implements IStoreHelper, public CInterface while (deltaIPIFile->exists()) { if (0 == d++ % 50) - PROGLOG("Waiting for a saveDelta in progress"); + PROGLOG("Waiting for a writeXml in progress"); MilliSleep(100); } } @@ -5741,7 +5886,7 @@ IStoreHelper *createStoreHelper(const char *storeName, const char *location, con #endif CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const char *_dataPath, const char *_daliName) - : coven(_coven), config(_config), server(*this), dataPath(_dataPath), daliName(_daliName), backupHandler(_config) + : coven(_coven), config(_config), server(*this), dataPath(_dataPath), daliName(_daliName) { config.Link(); restartOnError = config.getPropBool("@restartOnUnhandled"); @@ -5751,7 +5896,7 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const ignoreExternals=false; unsigned initNodeTableSize = queryCoven().getInitSDSNodes(); allNodes.ensure(initNodeTableSize?initNodeTableSize:INIT_NODETABLE_SIZE); - externalSizeThreshold = config.getPropInt("@externalSizeThreshold", DEFAULT_EXTERNAL_SIZE_THRESHOLD); + externalSizeThreshold = config.getPropInt("@externalSizeThreshold", defaultExternalSizeThreshold); remoteBackupLocation.set(config.queryProp("@remoteBackupLocation")); nextExternal = 1; if (0 == coven.getServerRank()) @@ -5796,11 +5941,11 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const registerSubscriptionManager(SDSNODE_PUBLISHER, nodeSubscriptionManager); // add external handlers - Owned xmlExternalHandler = new CXMLFileExternal(dataPath, backupHandler); + Owned xmlExternalHandler = new CXMLFileExternal(dataPath, *this); externalHandlers.replace(* new CExternalHandlerMapping(EF_XML, *xmlExternalHandler)); - Owned legacyBinaryExternalHandler = new CLegacyBinaryFileExternal(dataPath, backupHandler); + Owned legacyBinaryExternalHandler = new CLegacyBinaryFileExternal(dataPath, *this); externalHandlers.replace(* new CExternalHandlerMapping(EF_LegacyBinaryValue, *legacyBinaryExternalHandler)); - Owned binaryExternalHandler = new CBinaryFileExternal(dataPath, backupHandler); + Owned binaryExternalHandler = new CBinaryFileExternal(dataPath, *this); externalHandlers.replace(* new CExternalHandlerMapping(EF_BinaryValue, *binaryExternalHandler)); properties.setown(createPTree("Properties")); @@ -5830,10 +5975,7 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const properties->setProp("@dataPathUrl", path.str()); properties->setPropInt("@keepStores", keepLastN); if (remoteBackupLocation.length()) - { properties->setProp("@backupPathUrl", remoteBackupLocation.get()); - backupHandler.init(remoteBackupLocation, config.getPropBool("@asyncBackup", true)); - } const char *storeName = config.queryProp("@store"); if (!storeName) storeName = "dalisds"; @@ -5858,6 +6000,8 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const doTimeComparison = false; if (config.getPropBool("@lightweightCoalesce", true)) coalesce.setown(new CLightCoalesceThread(config, iStoreHelper)); + size_t extCacheSize = config.getPropInt("@extCacheSizeMB", defaultExtCacheSizeMB) * 0x100000; + extCache.init(extCacheSize); } #ifdef _MSC_VER @@ -5866,7 +6010,6 @@ CCovenSDSManager::CCovenSDSManager(ICoven &_coven, IPropertyTree &_config, const CCovenSDSManager::~CCovenSDSManager() { - backupHandler.stop(); if (unhandledThread) unhandledThread->join(); if (coalesce) coalesce->stop(); scanNotifyPool.clear(); @@ -5880,29 +6023,6 @@ CCovenSDSManager::~CCovenSDSManager() config.Release(); } -bool compareFiles(IFile *file1, IFile *file2, bool compareTimes=true) -{ - if (file1->exists()) - { - if (file2->exists()) - { - if (file1->size() == file2->size()) - { - if (!compareTimes) return true; - CDateTime modifiedTimeBackup; - file1->getTime(NULL, &modifiedTimeBackup, NULL); - CDateTime modifiedTime; - file2->getTime(NULL, &modifiedTime, NULL); - if (0 == modifiedTimeBackup.compare(modifiedTime, false)) - return true; - } - } - } - else - return !file2->exists(); - return false; -} - void CCovenSDSManager::validateDeltaBackup() { // check consistency of delta @@ -6367,6 +6487,8 @@ void CCovenSDSManager::loadStore(const char *storeName, const bool *abort) throw; } + deltaWriter.init(config, iStoreHelper); + if (!root) { root = (CServerRemoteTree *) createServerTree(); @@ -6465,29 +6587,15 @@ StringBuffer &transformToAbsolute(StringBuffer &result, const char *xpath, unsig return result; } -void cleanChangeTree(IPropertyTree &tree) -{ - tree.removeProp("@id"); - Owned iter = tree.getElements(RENAME_TAG); - ForEach (*iter) - iter->query().removeProp("@id"); - iter.setown(tree.getElements(DELETE_TAG)); - ForEach (*iter) - iter->query().removeProp("@id"); - iter.setown(tree.getElements(RESERVED_CHANGE_NODE)); - ForEach (*iter) - cleanChangeTree(iter->query()); -} - -void CCovenSDSManager::saveDelta(const char *path, IPropertyTree &changeTree) +void CCovenSDSManager::serializeDelta(const char *path, IPropertyTree *changeTree) { - CHECKEDCRITICALBLOCK(saveIncCrit, fakeCritTimeout); + Owned ownedChangeTree = changeTree; // translate changeTree to inc format (e.g. remove id's) if (externalEnvironment) { // don't save any changed to /Environment if external - if (startsWith(path, "/Environment") || (streq(path, "/") && changeTree.hasProp("*[@name=\"Environment\"]"))) + if (startsWith(path, "/Environment") || (streq(path, "/") && changeTree->hasProp("*[@name=\"Environment\"]"))) { Owned mpServer = getMPServer(); IAllowListHandler *allowListHandler = mpServer->queryAllowListCallback(); @@ -6497,101 +6605,7 @@ void CCovenSDSManager::saveDelta(const char *path, IPropertyTree &changeTree) return; } } - cleanChangeTree(changeTree); - // write out with header details (e.g. path) - Owned header = createPTree("Header"); - header->setProp("@path", path); - IPropertyTree *delta = header->addPropTree("Delta", createPTree()); - delta->addPropTree(changeTree.queryName(), LINK(&changeTree)); - - StringBuffer fname(dataPath); - OwnedIFile deltaIPIFile = createIFile(fname.append(DELTAINPROGRESS).str()); - OwnedIFileIO deltaIPIFileIO = deltaIPIFile->open(IFOcreate); - deltaIPIFileIO.clear(); - struct RemoveDIPBlock - { - IFile &iFile; - bool done; - void doit() { done = true; iFile.remove(); } - RemoveDIPBlock(IFile &_iFile) : iFile(_iFile), done(false) { } - ~RemoveDIPBlock () { if (!done) doit(); } - } removeDIP(*deltaIPIFile); - StringBuffer detachIPStr(dataPath); - OwnedIFile detachIPIFile = createIFile(detachIPStr.append(DETACHINPROGRESS).str()); - if (detachIPIFile->exists()) // very small window where this can happen. - { - // implies other operation about to access current delta - // CHECK session is really alive, otherwise it has been orphaned, so remove it. - try - { - SessionId sessId = 0; - OwnedIFileIO detachIPIO = detachIPIFile->open(IFOread); - if (detachIPIO) - { - size_t s = detachIPIO->read(0, sizeof(sessId), &sessId); - detachIPIO.clear(); - if (sizeof(sessId) == s) - { - // double check session is really alive - if (querySessionManager().sessionStopped(sessId, 0)) - detachIPIFile->remove(); - else - { - // *cannot block* because other op (sasha) accessing remote dali files, can access dali. - removeDIP.doit(); - PROGLOG("blocked"); - toXML(header, blockedDelta); - return; - } - } - } - } - catch (IException *e) { EXCLOG(e, NULL); e->Release(); } - } - bool first = false; - try - { - StringBuffer deltaFilename(dataPath); - iStoreHelper->getCurrentDeltaFilename(deltaFilename); - toXML(header, blockedDelta); - OwnedIFile iFile = createIFile(deltaFilename.str()); - first = !iFile->exists() || 0 == iFile->size(); - writeDelta(blockedDelta, *iFile); - } - catch (IException *e) - { - // NB: writeDelta retries a few times before giving up. - VStringBuffer errMsg("saveDelta: failed to save delta data, blockedDelta size=%d", blockedDelta.length()); - OWARNLOG(e, errMsg.str()); - e->Release(); - return; - } - if (remoteBackupLocation.length()) - { - try - { - if (backupOutOfSync) // true if there was previously an exception during synchronously writing delta to backup. - { - OWARNLOG("Backup delta is out of sync due to a prior backup write error, attempting to resync"); - // catchup - check and copy primary delta to backup - validateDeltaBackup(); - backupOutOfSync = false; - OWARNLOG("Backup delta resynchronized"); - } - else - backupHandler.addDelta(blockedDelta, iStoreHelper->queryCurrentEdition(), first); - } - catch (IException *e) - { - OERRLOG(e, "saveDelta: failed to save backup delta data"); - e->Release(); - backupOutOfSync = true; - } - } - if (blockedDelta.length() > 0x100000) - blockedDelta.kill(); - else - blockedDelta.clear(); + deltaWriter.addDelta(path, ownedChangeTree.getClear()); } CSubscriberContainerList *CCovenSDSManager::getSubscribers(const char *xpath, CPTStack &stack) @@ -6900,7 +6914,7 @@ void CCovenSDSManager::commit(CRemoteConnection &connection, bool *disconnectDel { // something commited, if RTM_Create was used need to remember this. StringBuffer path; serverConnection->queryPTreePath().getAbsolutePath(path); - saveDelta(path.str(), *changeTree); + serializeDelta(path.str(), changeTree.getClear()); bool lazyFetch = connection.setLazyFetch(false); tree->clearCommitChanges(&newIds); assertex(newIds.getPos() == newIds.length()); // must have read it all @@ -7722,8 +7736,8 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi { if (deltaChange.get()) { - PROGLOG("Exception on RTM_CREATE caused call to saveDelta, xpath=%s", xpath); - saveDelta(deltaPath, *deltaChange); + PROGLOG("Exception on RTM_CREATE caused call to serializeDelta, xpath=%s", xpath); + serializeDelta(deltaPath, deltaChange.getClear()); } throw; } @@ -7743,7 +7757,7 @@ void CCovenSDSManager::createConnection(SessionId sessionId, unsigned mode, unsi connection->notify(); SDSManager->startNotification(*deltaChange, stack, *branchChange); - saveDelta(deltaPath, *deltaChange); + serializeDelta(deltaPath, deltaChange.getClear()); } connectionId = connection->queryConnectionId(); @@ -7921,9 +7935,8 @@ void CCovenSDSManager::disconnect(ConnectionId id, bool deleteRoot, CLCLockBlock StringBuffer head; const char *tail = splitXPath(path.str(), head); - CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); if (NotFound != index) - saveDelta(head.str(), *changeTree); + serializeDelta(head.str(), changeTree.getClear()); else { // NB: don't believe this can happen, but last thing want to do is save duff delete delta. IERRLOG("** CCovenSDSManager::disconnect - index position lost **"); @@ -8128,6 +8141,7 @@ void CCovenSDSManager::blockingSave(unsigned *writeTransactions) if (writeTransactions) *writeTransactions = SDSManager->writeTransactions; // JCS - could in theory, not block, but abort save. + deltaWriter.clear(); SDSManager->saveStore(); } diff --git a/initfiles/componentfiles/configschema/xsd/dali.xsd b/initfiles/componentfiles/configschema/xsd/dali.xsd index fa2f236d49e..58ee4658899 100644 --- a/initfiles/componentfiles/configschema/xsd/dali.xsd +++ b/initfiles/componentfiles/configschema/xsd/dali.xsd @@ -61,15 +61,15 @@ hpcc:presetValue="true" hpcc:tooltip="Asynchronous backup of transactions"/> - - - + + + - - - - - - da.t3 - - - + - Backup computer + Seconds to hold off between transaction commits to disk (default off, will commit immediately) - + - Asynchronous backup of transactions + Maximum number of transaction before triggering a commit to disk (default off, will commit immediately) - + - Create and use a NFS mount point for backups + The max limit of pending uncommitted transactions. Transactions will be blocked until the number is reduced by committing to disk - + + + + + + da.t3 + + + - Granularity of pending backups to begin issue warnings about backlog + Backup computer - + - Limit above which the backup queue will start introducing delays until the backup queue decreases below this limit + Asynchronous backup of transactions - + - The maximum delay to introduce when the backupSoftQueueLimit has been introduced + Create and use a NFS mount point for backups diff --git a/initfiles/componentfiles/configxml/dali.xsl b/initfiles/componentfiles/configxml/dali.xsl index 34f7e25ce7f..16d13940839 100644 --- a/initfiles/componentfiles/configxml/dali.xsl +++ b/initfiles/componentfiles/configxml/dali.xsl @@ -140,7 +140,7 @@ dalisds.xml 0 - +