Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30085 Batch and write dali transactions asynchronously #17671

Merged

Conversation

jakesmith
Copy link
Member

@jakesmith jakesmith commented Aug 14, 2023

Also add an external file caching mechanism to reduce the
expense of frequently reloading recent externals.

Signed-off-by: Jake Smith [email protected]

Type of change:

  • This change is a bug fix (non-breaking change which fixes an issue).
  • This change is a new feature (non-breaking change which adds functionality).
  • This change improves the code (refactor or other change that does not change the functionality)
  • This change fixes warnings (the fix does not alter the functionality or the generated code)
  • This change is a breaking change (fix or feature that will cause existing behavior to change).
  • This change alters the query API (existing queries will have to be recompiled)

Checklist:

  • My code follows the code style of this project.
    • My code does not create any new warnings from compiler, build system, or lint.
  • The commit message is properly formatted and free of typos.
    • The commit message title makes sense in a changelog, by itself.
    • The commit is signed.
  • My change requires a change to the documentation.
    • I have updated the documentation accordingly, or...
    • I have created a JIRA ticket to update the documentation.
    • Any new interfaces or exported functions are appropriately commented.
  • I have read the CONTRIBUTORS document.
  • The change has been fully tested:
    • I have added tests to cover my changes.
    • All new and existing tests passed.
    • I have checked that this change does not introduce memory leaks.
    • I have used Valgrind or similar tools to check for potential issues.
  • I have given due consideration to all of the following potential concerns:
    • Scalability
    • Performance
    • Security
    • Thread-safety
    • Cloud-compatibility
    • Premature optimization
    • Existing deployed queries will not be broken
    • This change fixes the problem, not just the symptom
    • The target branch of this pull request is appropriate for such a change.
  • There are no similar instances of the same problem that should be addressed
    • I have addressed them here
    • I have raised JIRA issues to address them separately
  • This is a user interface / front-end modification
    • I have tested my changes in multiple modern browsers
    • The component(s) render as expected

Smoketest:

  • Send notifications about my Pull Request position in Smoketest queue.
  • Test my draft Pull Request.

Testing:

@jakesmith jakesmith force-pushed the HPCC-30085-dali-async-batch branch 2 times, most recently from c12cc11 to bc5c527 Compare August 16, 2023 11:32
@jakesmith jakesmith changed the title Hpcc 30085 dali async batch HPCC-30085 Batch and write dali transactions asynchronously Aug 16, 2023
@jakesmith jakesmith force-pushed the HPCC-30085-dali-async-batch branch 2 times, most recently from ec2f450 to 32d924a Compare August 16, 2023 15:17
@jakesmith jakesmith marked this pull request as ready for review August 16, 2023 15:18
@jakesmith
Copy link
Member Author

@ghalliday - please give this a 1st review.

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith a few minor comments from an initial scan. I will revisit again and review properly.

hpcc:displayName="Seconds between transactions being committed to disk" hpcc:presetValue="0"
hpcc:tooltip="The maximum time between commit pending transactions to disk (default=0, meaning commit immediately)"/>
<xs:attribute name="deltaTransactionQueueLimit" type="xs:nonNegativeInteger"
hpcc:displayName="Maximum number of pending uncommited transaction" hpcc:presetValue="10000"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: uncommitted

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will fix.

type = f_delta;
name = strdup(path);
}
CTransactionItem(char *_name, size32_t _dataLength, void *_data) : name(_name), dataLength(_dataLength), data(_data)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unusual that one constructor clones the name, and the other takes ownership.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, I'll make it consistently take ownership, and clone the string outside.

deltaFilename.clear().append(backupPath);
iStoreHelper->getCurrentDeltaFilename(deltaFilename);
OwnedIFile iFileDeltaBackup = createIFile(deltaFilename.str());
if (!compareFiles(iFileDeltaBackup, iFileDelta, false))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: A better function name would be filesMatch/filesAreIdentical - more specific, and reads better in the if

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not new, but agreed, I'll change it.

StringBuffer fname(dataPath);
OwnedIFile deltaIPIFile = createIFile(fname.append(DELTAINPROGRESS).str());
OwnedIFileIO deltaIPIFileIO = deltaIPIFile->open(IFOcreate);
deltaIPIFileIO.clear();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the close? Worth a comment at least.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not new code, but relocated. This code is effectively a touch()
Just looked, and there is a jfile touchFile function, but not for an IFile, nor a removeFile for a filename - I may add, to clarify what this is doing.

size_t items = pending.size();
if ((pendingSz < transactionMaxMem) && (items < transactionQueueLimit))
{
if (nextTimeThreshold && (get_cycles_now() < nextTimeThreshold))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is the potential for wrapping - if nextTimeThreshold gets to (cycles_type)-1 then this is unlikely to every be true. Always safer to test (get_cycles_now() - prevWriteTime) > thresholdDuration

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right, will change.

item->flags = BackupQueueItem::f_addext;
add(item);
CriticalBlock b(pendingCrit);
CTransactionItem *item = new CTransactionItem(name, length, content);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

efficiency: create the item outside of the critical section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I'll move the creation of these.

item->flags = BackupQueueItem::f_delext;
add(item);
CriticalBlock b(pendingCrit);
addToQueue(new CTransactionItem(path, delta));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

efficiency: create the item outside of the critical section.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will move

e->Release();
aborted = true;
sem.signal();
pendingCrit.enter();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth a comment why you are doing this.

class CBinaryFileExternal : public CExternalFile, implements IExternalHandler
{
public:
IMPLEMENT_IINTERFACE;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should really be
IMPLEMENT_IINTERFACE_USING(CExternalFile)

++throttleCounter;
if (timeThrottled >= queryOneSecCycles())
{
IWARNLOG("Transactions throttled - current items = %u, since last message throttled-time/tracactions = { %u ms, %u }, total hard limit hits = %u", (unsigned)items, (unsigned)cycle_to_millisec(timeThrottled), throttleCounter, totalQueueLimitHits);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo?: tracactions->transactions?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will correct.

@jakesmith
Copy link
Member Author

@ghalliday - please see new commit

Semaphore sem;
cycle_t timeThrottled = 0;
unsigned throttleCounter = 0;
bool waiting = true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be atomic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does it? It's always altered or read whilst pendingCrit is held.

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith a few relatively minor comments.

// external file

// commit accumulated delta 1st (so write order is consistent)
if (deltaXml.length())
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you possibly do this in two phases?
a) write new files, but don't flush xml
after all written
b) remove all external.

StringBuffer deltaFilename(dataPath);
iStoreHelper->getCurrentDeltaFilename(deltaFilename);
OwnedIFile iFile = createIFile(deltaFilename.str());
writeDelta(deltaXml, *iFile);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crc calculation in that function is a bit expensive. Might be worth considering other options/implementations.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsigned throttleCounter = 0;
bool waiting = true;
bool backupOutOfSync = false;
bool aborted = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be atomic

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed, will change.

cleanChangeTree(*changeTree);

// write out with header details (e.g. path)
Owned<IPropertyTree> header = createPTree("Header");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

efficiency: Not as clean, but would it be more efficient to explicitly write out the text
"<Header path='%s'><Delta>"..."</Delta></Header>"
than the overhead of creating the tree nodes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes could do, cuts down on the 'Header' and 'Delta' trees.
I'll change.

item->data = data; // take ownership
item->flags = BackupQueueItem::f_addext;
add(item);
CTransactionItem *item = new CTransactionItem(name, length, content);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be checking aborted still? (And the other similar functions)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it highlights an issue. The deltaWriter is cleared when blockingSave is called (when Dali itself saves all in memory state, the pending 'to be saved' transactions are cleared). However, when Dali shuts down that is not done.
The delta writer should both be cleared and stopped, although by the time it (Dali) is stopping and saving, there will be no more transactions added.

I will make some changes.

serializeVisibleAttributes(owner, mb);
StringBuffer extName;
getName(extName, name);
if (manager.extCache.lookup(extName, mb))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should withValue be passed/tested before this? Is it something that occurs often?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should withValue be passed/tested before this?

it does look that way, I need to double check.

Is it something that occurs often?

is hitting the cache something that happens often?
Yes, in testing, the same external is often re-read in quick succession.

getFilename(filename, name);
Owned<IFile> iFile = createIFile(filename.str());
size32_t sz = (size32_t)iFile->size();
if ((unsigned)-1 == sz)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to cast to size32_t for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, will change.

getFilename(filename, name);
Owned<IFile> iFile = createIFile(filename.str());
size32_t sz = (size32_t)iFile->size();
if ((unsigned)-1 == sz)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to cast to size32_t for consistency.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change.

else
{
Owned<IFileIO> fileIO = iFile->open(IFOread);
verifyex(sz == ::read(fileIO, 0, sz, mb));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

read function unnecessarily checks size() again. I don't know if that will cause an extra delay (on remote storage), but may be worth avoiding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like it would at least be worth modifying ::read to avoid the check:

    const size32_t checkLengthLimit = 0x1000;
    if (len >= checkLengthLimit)
    {
        //Don't allocate a stupid amount of memory....
        offset_t fileLength = in->size();
        if (pos > fileLength)
            pos = fileLength;
        if ((len == (size32_t)-1) || (pos + len > fileLength))
            len = (size32_t)(fileLength - pos);
    }

if pos == 0 and len != -1

Copy link
Member Author

@jakesmith jakesmith Aug 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense to do in separate JIRA/PR : https://track.hpccsystems.com/browse/HPCC-30193

// force a synchronous save
CCycleTimer timer;
CHECKEDCRITICALBLOCK(blockedSaveCrit, fakeCritTimeout); // because if Dali is saving state (::blockingSave), it will clear pending
PROGLOG("Forcing synchronous save of %u transactions", (unsigned)pending.size());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move before the critsec? I suspect it is unlikely to be contended so not at all significant.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but yes, logical to move before

thresholdDuration = queryOneSecCycles() * saveThresholdSecs;
lastSaveTime = get_cycles_now();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NB: this is an incidental change. Not being set before meant it would save without checking threshold early than intended (not a big deal, but more correct to set time at start)

@jakesmith
Copy link
Member Author

@ghalliday - please review new commit

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith looks close to ready. A few comments on naming, and I think one potential hole.

if (pendingSz)
signalWaiting();
}
if (!waiting)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trivial: else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's intentionally not an else, because signalWaiting (if called above) wil reset waiting to false

@@ -1407,17 +1427,36 @@ class CDeltaWriter : implements IThreaded
CriticalBlock b(pendingCrit);
addToQueue(item);
}
void clear()
void flush()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be commented because it isn't clear what the different flags mean etc. Something like
//ensure that all pending transactions have been flushed to the file.

if (waiting)
{
// The writer thread is actively waiting - if there is data to write ensure it is written
}
else
{
//delta is currently being written. Wait until all the data is written.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will update with some comments.

if (waiting)
{
if (pendingSz)
signalWaiting();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this also wait until until signalEmpty occurs? Othewise it might still be writing data (or not yet have written the data) before the function continues.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will because sianglWaiting resets waiting to false (all within pendingCrit), and so it will fall into the if (!waiting) block below.

bool waitOnEmptySem = false;
{
CriticalBlock b(pendingCrit);
if (waiting)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waiting is a very confusing variable name - especially since it is set from thread and cleared in another. It actually seems to mean.

needToSendSignalToRequestDataIsWritten

I think it would be worth coming up with a clearer name - it would help check the logic is correct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've renamed it deltaWriterIdle, which I think is more accurate and clarifies the logic IMO.

waitOnEmptySem = true;
}
}
if (waitOnEmptySem)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"allWritten" would be a bit clearer than "empty"

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, will rename this and some others to clarify meaning.

@@ -1445,6 +1484,11 @@ class CDeltaWriter : implements IThreaded
if (semTimedout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be
if (semTimedout && !needToSendSignalToRequestDataIsWritten)
{
sem.wait(); // Consume the signal that was sent
needToSendSignalToRequestDataIsWritten = true;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that's true, I'll modify.

@@ -1445,6 +1484,11 @@ class CDeltaWriter : implements IThreaded
if (semTimedout)
sem.wait(0);
waiting = true;
if (signalEmpty)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

signalWhen...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will rename.

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith please squash. A couple of comments to clarify the code, but I think it is functional. Will scan once more when squashed.

bool waitOnEmptySem = false;
// ensure all pending transactions are written out
// if necessary wait until CDeltaWriter thread has finished
bool allWritten = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

picky: really waitUntilAllWritten

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will change

signalWaiting();
}
if (!waiting)
if (!writerIdle) // i.e. if not idle, it is busy writing
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Worth having a comment that this includes if signalWaiting() was called above. Otherwise it is very tempting to turn this if into an else, because it strange that writerIdle would be set by the requester.
writerRequested would be clearer than writerIdle, but would invert the condition, so safer not to make that change now.

signalWaiting() would similarly be better renamed as request[Async]Write()

Copy link
Member Author

@jakesmith jakesmith Sep 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have made these changes, but kept writerIdle -> writeRequested change in its own commit to allow easier review

Copy link
Member

@ghalliday ghalliday left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakesmith please squash

Also add an external file caching mechanism to reduce the
expense of frequently reloading recent externals.

Signed-off-by: Jake Smith <[email protected]>
@jakesmith
Copy link
Member Author

@ghalliday - squashed

@ghalliday ghalliday merged commit d502065 into hpcc-systems:master Sep 5, 2023
29 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants