Skip to content

Commit

Permalink
HPCC-30085 2nd review changes
Browse files Browse the repository at this point in the history
+ Write xml transactions after externals written, pending external
removals at end
+ Ensure save/stop flushes transactions

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Aug 31, 2023
1 parent c5c6b14 commit 70d928c
Showing 1 changed file with 72 additions and 33 deletions.
105 changes: 72 additions & 33 deletions dali/base/dasds.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,9 @@ class CDeltaWriter : implements IThreaded
unsigned throttleCounter = 0;
bool waiting = true;
bool backupOutOfSync = false;
bool aborted = false;
std::atomic<bool> aborted = false;
bool signalEmpty = false;
Semaphore emptySem;

void validateDeltaBackup()
{
Expand Down Expand Up @@ -1306,6 +1308,7 @@ class CDeltaWriter : implements IThreaded
catch (IException *e) { EXCLOG(e, NULL); e->Release(); }
}

std::vector<std::string> pendingExtDeletes;
while (!todo.empty())
{
CTransactionItem *item = todo.front();
Expand All @@ -1315,44 +1318,54 @@ class CDeltaWriter : implements IThreaded
item->deltaTree = nullptr;
cleanChangeTree(*changeTree);

// write out with header details (e.g. path)
Owned<IPropertyTree> header = createPTree("Header");
header->setProp("@path", item->name);
IPropertyTree *delta = header->addPropTree("Delta", createPTree());
const char *nodeName = changeTree->queryName();
delta->addPropTree(nodeName, changeTree.getClear());
toXML(header, deltaXml);
// write out with header details (i.e. path)
deltaXml.appendf("<Header path=\"%s\">\n <Delta>\n", item->name);
toXML(changeTree, deltaXml, 4);
deltaXml.append(" </Delta>\n</Header>");
}
else
{
// external file

// commit accumulated delta 1st (so write order is consistent)
if (deltaXml.length())
writeXml(); // also handles backup if needed

if (CTransactionItem::f_addext == item->type)
{
// NB: the XML referring to these externals will not be written until the pending xml is built up
// meaning if there's a cold restart at this point, there will be unreferenced ext files on disk (and ensuing warnings)
// However, that is better than committing the xml that references externals, before they are exist in the event of
// a cold restart between the two.
// NB2: It is also important to ensure that these externals are committed to disk, in the event of a Dali saveStore
// because the saved in-memory store has been altered at this point, i.e. it depends on the external file.
writeExt(dataPath, item->name, item->dataLength, item->data);
if (backupPath.length())
writeExt(backupPath, item->name, item->dataLength, item->data, 60, 30);
}
else
{
// track externals to delete / don't delete until xml is committed. See below.
dbgassertex(CTransactionItem::f_delext == item->type);
deleteExt(dataPath, item->name);
if (backupPath.length())
deleteExt(backupPath, item->name, 60, 30);
pendingExtDeletes.push_back(item->name);
}
}
todo.pop();
}
if (deltaXml.length())
writeXml();
for (auto &ext : pendingExtDeletes)
{
deleteExt(dataPath, ext.c_str());
if (backupPath.length())
deleteExt(backupPath, ext.c_str(), 60, 30);
}
if (thresholdDuration)
lastSaveTime = get_cycles_now();
return true;
}
void signalWaiting()
{
// must be called whilst pendingCrit is held + waiting == true
waiting = false;
sem.signal();
}
void addToQueue(CTransactionItem *item);
public:
CDeltaWriter() : threaded("CDeltaWriter")
Expand All @@ -1369,7 +1382,10 @@ class CDeltaWriter : implements IThreaded
unsigned deltaTransactionMaxMemMB = config.getPropInt("@deltaTransactionMaxMemMB", defaultDeltaMemMaxMB);
transactionMaxMem = (memsize_t)deltaTransactionMaxMemMB * 0x100000;
if (saveThresholdSecs)
{
thresholdDuration = queryOneSecCycles() * saveThresholdSecs;
lastSaveTime = get_cycles_now();
}
if (0 == transactionQueueLimit) // treat 0 as disabled, meaning that the # pending transactions will not be considered
transactionQueueLimit = INFINITE;

Expand Down Expand Up @@ -1407,17 +1423,36 @@ class CDeltaWriter : implements IThreaded
CriticalBlock b(pendingCrit);
addToQueue(item);
}
void clear()
void flush()
{
CriticalBlock b(pendingCrit);
pending = {};
bool waitOnEmptySem = false;
{
CriticalBlock b(pendingCrit);
if (waiting)
{
if (pendingSz)
signalWaiting();
}
if (!waiting)
{
signalEmpty = true;
waitOnEmptySem = true;
}
}
if (waitOnEmptySem)
{
// this should not be here long, but log just in case
while (!emptySem.wait(10000))
WARNLOG("Waiting on CDeltaWriter to flush transactions");
}
}
void stop()
void abort()
{
if (!aborted)
{
aborted = true;
sem.signal();
emptySem.signal();
threaded.join();
}
}
Expand Down Expand Up @@ -1445,6 +1480,11 @@ class CDeltaWriter : implements IThreaded
if (semTimedout)
sem.wait(0);
waiting = true;
if (signalEmpty)
{
signalEmpty = false;
emptySem.signal();
}
break;
}
pendingSz = 0;
Expand Down Expand Up @@ -1515,7 +1555,7 @@ class CLegacyBinaryFileExternal : public CExternalFile, implements IExternalHand

Owned<IFile> iFile = createIFile(filename.str());
size32_t sz = (size32_t)iFile->size();
if ((unsigned)-1 == sz)
if ((size32_t)-1 == sz)
{
StringBuffer s("Missing external file ");
if (*_name)
Expand Down Expand Up @@ -2120,7 +2160,7 @@ void CBinaryFileExternal::readValue(const char *name, MemoryBuffer &mb)
getFilename(filename, name);
Owned<IFile> iFile = createIFile(filename.str());
size32_t sz = (size32_t)iFile->size();
if ((unsigned)-1 == sz)
if ((size32_t)-1 == sz)
{
StringBuffer s("Missing external file ");
Owned<IException> e = MakeSDSException(SDSExcpt_MissingExternalFile, "%s", filename.str());
Expand Down Expand Up @@ -2148,18 +2188,19 @@ void CBinaryFileExternal::read(const char *name, IPropertyTree &owner, MemoryBuf
byte flags = ((PTree &)owner).queryFlags();
mb.append(flags);
serializeVisibleAttributes(owner, mb);
StringBuffer extName;
getName(extName, name);
if (manager.extCache.lookup(extName, mb))
return;

if (withValue)
{
StringBuffer extName;
getName(extName, name);
if (manager.extCache.lookup(extName, mb))
return;

StringBuffer filename;
getFilename(filename, name);
Owned<IFile> iFile = createIFile(filename.str());
size32_t sz = (size32_t)iFile->size();
if ((unsigned)-1 == sz)
if ((size32_t)-1 == sz)
{
IptFlagClr(flags, ipt_binary);
mb.writeDirect(flagsPos, sizeof(flags), &flags);
Expand Down Expand Up @@ -2213,18 +2254,15 @@ void CDeltaWriter::addToQueue(CTransactionItem *item)
return;

if (waiting)
{
waiting = false;
sem.signal();
}
signalWaiting();
}
else // here if exceeded transationQueueLimit, transactionMaxMem or exceeded time threshold (deltaSaveThresholdSecs)
{
++totalQueueLimitHits;
// force a synchronous save
CCycleTimer timer;
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
PROGLOG("Forcing synchronous save of %u transactions", (unsigned)pending.size());
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
if (save(pending)) // if temporarily blocked, continue, meaning queue limit will overrun a bit (blocking window is short)
{
pendingSz = 0;
Expand Down Expand Up @@ -6532,6 +6570,9 @@ void CCovenSDSManager::saveStore(const char *storeName, bool currentEdition)
CIgnore() { SDSManager->ignoreExternals=true; }
~CIgnore() { SDSManager->ignoreExternals=false; }
} ignore;
// lock blockingaveCrit after flush, since deltaWriter itself blocks on blockedSaveCrit
deltaWriter.flush(); // transactions will be blocked at this stage, no new deltas will be added to writer
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout);
iStoreHelper->saveStore(root, NULL, currentEdition);
unsigned initNodeTableSize = allNodes.maxElements()+OVERFLOWSIZE;
queryCoven().setInitSDSNodes(initNodeTableSize>INIT_NODETABLE_SIZE?initNodeTableSize:INIT_NODETABLE_SIZE);
Expand Down Expand Up @@ -8139,11 +8180,9 @@ MemoryBuffer &CCovenSDSManager::collectSubscribers(MemoryBuffer &out)
void CCovenSDSManager::blockingSave(unsigned *writeTransactions)
{
CHECKEDDALIREADLOCKBLOCK(SDSManager->dataRWLock, readWriteTimeout); // block all write actions whilst saving
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout);
if (writeTransactions)
*writeTransactions = SDSManager->writeTransactions;
// JCS - could in theory, not block, but abort save.
deltaWriter.clear();
SDSManager->saveStore();
}

Expand Down

0 comments on commit 70d928c

Please sign in to comment.