diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index c9d803e2061..d5245aaaa9a 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -246,6 +246,7 @@ class THORHELPER_API ActivityTimeAccumulator unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch) cycle_t firstExitCycles; // Wall clock time of first exit from this activity cycle_t blockedCycles; // Time spent blocked + cycle_t lookAheadCycles; // Time spent by lookahead thread // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit) inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); } @@ -265,6 +266,7 @@ class THORHELPER_API ActivityTimeAccumulator firstRow = 0; firstExitCycles = 0; blockedCycles = 0; + lookAheadCycles = 0; } }; @@ -314,7 +316,7 @@ class SimpleActivityTimer cycle_t startCycles; cycle_t &accumulator; protected: - const bool enabled; + bool enabled; public: inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled) : accumulator(_accumulator), enabled(_enabled) @@ -326,41 +328,35 @@ class SimpleActivityTimer } inline ~SimpleActivityTimer() + { + leave(); + } + inline void leave() { if (likely(enabled)) { cycle_t nowCycles = get_cycles_now(); cycle_t elapsedCycles = nowCycles - startCycles; accumulator += elapsedCycles; + enabled = false; } } }; -class BlockedActivityTimer +class BlockedActivityTimer : public SimpleActivityTimer { - unsigned __int64 startCycles; - ActivityTimeAccumulator &accumulator; -protected: - const bool enabled; public: BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) - : accumulator(_accumulator), enabled(_enabled) - { - if (enabled) - startCycles = get_cycles_now(); - else - startCycles = 0; - } + : SimpleActivityTimer(_accumulator.blockedCycles, _enabled) { } +}; - ~BlockedActivityTimer() - { - if (enabled) - { - cycle_t elapsedCycles = get_cycles_now() - startCycles; - accumulator.blockedCycles += elapsedCycles; - } - } +class LookAheadTimer : public SimpleActivityTimer +{ +public: + inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) + : SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { } }; + #else struct ActivityTimer { @@ -373,7 +369,13 @@ struct SimpleActivityTimer struct BlockedActivityTimer { inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { } + }; +struct LookAheadTimer +{ + inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ } +}; + #endif class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 1b40a879fe8..21c719eeb8b 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -314,6 +314,8 @@ enum StatisticKind StNumParallelExecute, StNumAgentRequests, StSizeAgentRequests, + StTimeLookAhead, + StCycleLookAheadCycles, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 45e0aa0e09f..d94054fe65f 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -986,6 +986,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { NUMSTAT(ParallelExecute), "The number of parallel execution paths for this activity" }, { NUMSTAT(AgentRequests), "The number of agent request packets for this activity" }, { SIZESTAT(AgentRequests), "The total size of agent request packets for this activity" }, + { TIMESTAT(LookAhead), "The total time lookahead thread spend prefetching rows from upstream activities" }, + { CYCLESTAT(LookAhead) }, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index 32962951c52..e2773c3c28d 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -85,6 +85,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface inputStream = funnel.activity.queryInputStream(inputIndex); while (!stopping) { + LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities()); numRows = 0; for (;numRows < chunkSize; numRows++) { @@ -141,14 +142,15 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface Linked serializer; void push(const void *row) - { + { size32_t rowSize = thorRowMemoryFootprint(serializer, row); bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { - + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + timer.leave(); if (stopped) { ReleaseThorRow(row); @@ -179,7 +181,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + timer.leave(); if (stopped) { for (unsigned i=0; i < numRows; i++) @@ -266,7 +270,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface } { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); + timer.leave(); stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again. if (waiting) { @@ -301,7 +307,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface size32_t sz = thorRowMemoryFootprint(serializer, row.get()); unsigned numToSignal = 0; { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); + timer.leave(); assertex(totSize>=sz); totSize -= sz; if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE)) diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index d733aa41a64..958fea57a04 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -60,6 +60,7 @@ class CSplitterOutput : public CSimpleInterfaceOf, pu virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; virtual unsigned __int64 queryTotalCycles() const override { return COutputTiming::queryTotalCycles(); } virtual unsigned __int64 queryEndCycles() const override { return COutputTiming::queryEndCycles(); } + virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); } virtual void debugRequest(MemoryBuffer &mb) override; // Stepping methods virtual IInputSteppingMeta *querySteppingMeta() { return nullptr; } diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index afa10f9b282..4aee196d999 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -119,10 +119,17 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { while (requiredLeft&&running) { - OwnedConstThorRow row = inputStream->nextRow(); - if (!row) + OwnedConstThorRow row; { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); row.setown(inputStream->nextRow()); + } + if (!row) + { + { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(inputStream->nextRow()); + } if (!row) break; else @@ -138,7 +145,11 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { while (requiredLeft&&running) { - OwnedConstThorRow row = inputStream->ungroupedNextRow(); + OwnedConstThorRow row; + { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(inputStream->ungroupedNextRow()); + } if (!row) break; ++count; @@ -234,7 +245,15 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf // IEngineRowStream virtual const void *nextRow() override { - OwnedConstThorRow row = smartbuf->nextRow(); + OwnedConstThorRow row; + { + // smartbuf->nextRow should return immediately if a row is available. + // smartbuf->nextRow will take time if blocked, so record time taken as blocked time. + // N.b. smartbuf->next may take a trivial amount of time if row is available but + // for the purposes of stats this will still be considered blocked. + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(smartbuf->nextRow()); + } if (getexception) throw getexception.getClear(); if (!row) diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index e39c45909d5..09bf63fd50b 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -586,14 +586,17 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const break; } } - unsigned __int64 localCycles = queryTotalCycles(); - if (localCycles < inputCycles) // not sure how/if possible, but guard against + unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles(); + if (processCycles < inputCycles) // not sure how/if possible, but guard against return 0; - localCycles -= inputCycles; + processCycles -= inputCycles; const unsigned __int64 blockedCycles = queryBlockedCycles(); - if (localCycles < blockedCycles) + if (processCycles < blockedCycles) + { + IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles); return 0; - return localCycles-blockedCycles; + } + return processCycles-blockedCycles; } void CSlaveActivity::serializeStats(MemoryBuffer &mb) @@ -618,6 +621,8 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb) serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles())); serializedStats.setStatistic(StTimeTotalExecute, (unsigned __int64)cycle_to_nanosec(queryTotalCycles())); serializedStats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles())); + serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(queryLookAheadCycles())); + serializedStats.serialize(mb); ForEachItemIn(i, outputs) { diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 0d758113c51..0695c83d9a8 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -46,10 +46,11 @@ class COutputTiming COutputTiming() { } void resetTiming() { slaveTimerStats.reset(); } - ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; } + ActivityTimeAccumulator &getActivityTimerAccumulator() { return slaveTimerStats; } unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; } unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; } unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; } + unsigned __int64 queryLookAheadCycles() const { return slaveTimerStats.lookAheadCycles; } }; class CEdgeProgress @@ -289,8 +290,9 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres return consumerOrdered; } virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); } - virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();} + virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); } virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); } + virtual unsigned __int64 queryLookAheadCycles() const { return COutputTiming::queryLookAheadCycles(); } virtual void debugRequest(MemoryBuffer &msg) override; // IThorDataLink diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 25e62473115..275669abfdc 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -76,7 +76,7 @@ static Owned ClusterMPAllocator; const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk}); const StatisticsMapping executeStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping soapcallStatistics({StTimeSoapcall}); -const StatisticsMapping basicActivityStatistics({StNumParallelExecute}, executeStatistics, spillStatistics); +const StatisticsMapping basicActivityStatistics({StNumParallelExecute, StTimeLookAhead}, executeStatistics, spillStatistics); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics); const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics);