Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32480 Capture "look ahead" timings for unordered concat (parallel funnel) #19164

Draft
wants to merge 3 commits into
base: candidate-9.8.x
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions common/thorhelper/thorcommon.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,7 @@ class THORHELPER_API ActivityTimeAccumulator
unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch)
cycle_t firstExitCycles; // Wall clock time of first exit from this activity
cycle_t blockedCycles; // Time spent blocked
cycle_t lookAheadCycles; // Time spent by lookahead thread

// Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit)
inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); }
Expand All @@ -265,6 +266,7 @@ class THORHELPER_API ActivityTimeAccumulator
firstRow = 0;
firstExitCycles = 0;
blockedCycles = 0;
lookAheadCycles = 0;
}
};

Expand Down Expand Up @@ -314,7 +316,7 @@ class SimpleActivityTimer
cycle_t startCycles;
cycle_t &accumulator;
protected:
const bool enabled;
bool enabled;
public:
inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled)
: accumulator(_accumulator), enabled(_enabled)
Expand All @@ -326,41 +328,35 @@ class SimpleActivityTimer
}

inline ~SimpleActivityTimer()
{
leave();
}
inline void leave()
{
if (likely(enabled))
{
cycle_t nowCycles = get_cycles_now();
cycle_t elapsedCycles = nowCycles - startCycles;
accumulator += elapsedCycles;
enabled = false;
}
}
};

class BlockedActivityTimer
class BlockedActivityTimer : public SimpleActivityTimer
{
unsigned __int64 startCycles;
ActivityTimeAccumulator &accumulator;
protected:
const bool enabled;
public:
BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
: accumulator(_accumulator), enabled(_enabled)
{
if (enabled)
startCycles = get_cycles_now();
else
startCycles = 0;
}
: SimpleActivityTimer(_accumulator.blockedCycles, _enabled) { }
};

~BlockedActivityTimer()
{
if (enabled)
{
cycle_t elapsedCycles = get_cycles_now() - startCycles;
accumulator.blockedCycles += elapsedCycles;
}
}
class LookAheadTimer : public SimpleActivityTimer
{
public:
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled)
: SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { }
};

#else
struct ActivityTimer
{
Expand All @@ -373,7 +369,13 @@ struct SimpleActivityTimer
struct BlockedActivityTimer
{
inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { }

};
struct LookAheadTimer
{
inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ }
};

#endif

class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,8 @@ enum StatisticKind
StNumParallelExecute,
StNumAgentRequests,
StSizeAgentRequests,
StTimeLookAhead,
StCycleLookAheadCycles,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -986,6 +986,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ NUMSTAT(ParallelExecute), "The number of parallel execution paths for this activity" },
{ NUMSTAT(AgentRequests), "The number of agent request packets for this activity" },
{ SIZESTAT(AgentRequests), "The total size of agent request packets for this activity" },
{ TIMESTAT(LookAhead), "The total time lookahead thread spend prefetching rows from upstream activities" },
{ CYCLESTAT(LookAhead) },
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
12 changes: 10 additions & 2 deletions thorlcr/activities/funnel/thfunnelslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
inputStream = funnel.activity.queryInputStream(inputIndex);
while (!stopping)
{
LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities());
numRows = 0;
for (;numRows < chunkSize; numRows++)
{
Expand Down Expand Up @@ -141,14 +142,15 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
Linked<IOutputRowSerializer> serializer;

void push(const void *row)
{
{
size32_t rowSize = thorRowMemoryFootprint(serializer, row);

bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{

BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
timer.leave();
if (stopped)
{
ReleaseThorRow(row);
Expand Down Expand Up @@ -179,7 +181,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
bool waitForSpace = false;
// only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time
{
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit.
timer.leave();
if (stopped)
{
for (unsigned i=0; i < numRows; i++)
Expand Down Expand Up @@ -266,7 +270,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
}

{
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit);
timer.leave();
stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again.
if (waiting)
{
Expand Down Expand Up @@ -301,7 +307,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface
size32_t sz = thorRowMemoryFootprint(serializer, row.get());
unsigned numToSignal = 0;
{
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
CriticalBlock b(crit);
timer.leave();
assertex(totSize>=sz);
totSize -= sz;
if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE))
Expand Down
1 change: 1 addition & 0 deletions thorlcr/activities/nsplitter/thnsplitterslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ class CSplitterOutput : public CSimpleInterfaceOf<IStartableEngineRowStream>, pu
virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf<IEngineRowStream> &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override;
virtual unsigned __int64 queryTotalCycles() const override { return COutputTiming::queryTotalCycles(); }
virtual unsigned __int64 queryEndCycles() const override { return COutputTiming::queryEndCycles(); }
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); }
virtual void debugRequest(MemoryBuffer &mb) override;
// Stepping methods
virtual IInputSteppingMeta *querySteppingMeta() { return nullptr; }
Expand Down
27 changes: 23 additions & 4 deletions thorlcr/activities/thactivityutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,17 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
while (requiredLeft&&running)
{
OwnedConstThorRow row = inputStream->nextRow();
if (!row)
OwnedConstThorRow row;
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->nextRow());
}
if (!row)
{
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->nextRow());
}
if (!row)
break;
else
Expand All @@ -138,7 +145,11 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
{
while (requiredLeft&&running)
{
OwnedConstThorRow row = inputStream->ungroupedNextRow();
OwnedConstThorRow row;
{
LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(inputStream->ungroupedNextRow());
}
if (!row)
break;
++count;
Expand Down Expand Up @@ -234,7 +245,15 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf<IStartableEngineRowStream>
// IEngineRowStream
virtual const void *nextRow() override
{
OwnedConstThorRow row = smartbuf->nextRow();
OwnedConstThorRow row;
{
// smartbuf->nextRow should return immediately if a row is available.
// smartbuf->nextRow will take time if blocked, so record time taken as blocked time.
// N.b. smartbuf->next may take a trivial amount of time if row is available but
// for the purposes of stats this will still be considered blocked.
BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities());
row.setown(smartbuf->nextRow());
}
if (getexception)
throw getexception.getClear();
if (!row)
Expand Down
15 changes: 10 additions & 5 deletions thorlcr/graph/thgraphslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -586,14 +586,17 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const
break;
}
}
unsigned __int64 localCycles = queryTotalCycles();
if (localCycles < inputCycles) // not sure how/if possible, but guard against
unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles();
if (processCycles < inputCycles) // not sure how/if possible, but guard against
return 0;
localCycles -= inputCycles;
processCycles -= inputCycles;
const unsigned __int64 blockedCycles = queryBlockedCycles();
if (localCycles < blockedCycles)
if (processCycles < blockedCycles)
{
IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles);
return 0;
return localCycles-blockedCycles;
}
return processCycles-blockedCycles;
}

void CSlaveActivity::serializeStats(MemoryBuffer &mb)
Expand All @@ -618,6 +621,8 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb)
serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles()));
serializedStats.setStatistic(StTimeTotalExecute, (unsigned __int64)cycle_to_nanosec(queryTotalCycles()));
serializedStats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles()));
serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(queryLookAheadCycles()));

serializedStats.serialize(mb);
ForEachItemIn(i, outputs)
{
Expand Down
6 changes: 4 additions & 2 deletions thorlcr/graph/thgraphslave.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,11 @@ class COutputTiming
COutputTiming() { }

void resetTiming() { slaveTimerStats.reset(); }
ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; }
ActivityTimeAccumulator &getActivityTimerAccumulator() { return slaveTimerStats; }
unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; }
unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; }
unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; }
unsigned __int64 queryLookAheadCycles() const { return slaveTimerStats.lookAheadCycles; }
};

class CEdgeProgress
Expand Down Expand Up @@ -289,8 +290,9 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres
return consumerOrdered;
}
virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); }
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();}
virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); }
virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); }
virtual unsigned __int64 queryLookAheadCycles() const { return COutputTiming::queryLookAheadCycles(); }
virtual void debugRequest(MemoryBuffer &msg) override;

// IThorDataLink
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/thorutil/thormisc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ static Owned<IMPtagAllocator> ClusterMPAllocator;
const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk});
const StatisticsMapping executeStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked});
const StatisticsMapping soapcallStatistics({StTimeSoapcall});
const StatisticsMapping basicActivityStatistics({StNumParallelExecute}, executeStatistics, spillStatistics);
const StatisticsMapping basicActivityStatistics({StNumParallelExecute, StTimeLookAhead}, executeStatistics, spillStatistics);
const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics);
const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics);
const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics);
Expand Down
Loading