From 5f97267a60c06a0c96c0fc8274c5b94f86746891 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Tue, 27 Aug 2024 11:52:11 +0100 Subject: [PATCH 1/3] HPCC-32479 Record lookahead timings and use it to calculate localtime Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 26 ++++++++++++++++++ system/jlib/jstatcodes.h | 2 ++ system/jlib/jstats.cpp | 2 ++ .../activities/nsplitter/thnsplitterslave.cpp | 1 + thorlcr/activities/thactivityutil.cpp | 27 ++++++++++++++++--- thorlcr/graph/thgraphslave.cpp | 12 +++++---- thorlcr/graph/thgraphslave.hpp | 6 +++-- thorlcr/thorutil/thormisc.cpp | 2 +- 8 files changed, 66 insertions(+), 12 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index c9d803e2061..eff2964464a 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -246,6 +246,7 @@ class THORHELPER_API ActivityTimeAccumulator unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch) cycle_t firstExitCycles; // Wall clock time of first exit from this activity cycle_t blockedCycles; // Time spent blocked + cycle_t lookAheadCycles; // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit) inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); } @@ -265,6 +266,7 @@ class THORHELPER_API ActivityTimeAccumulator firstRow = 0; firstExitCycles = 0; blockedCycles = 0; + lookAheadCycles = 0; } }; @@ -361,6 +363,30 @@ class BlockedActivityTimer } } }; + +class LookAheadTimer +{ + cycle_t startCycles; + ActivityTimeAccumulator &accumulator; +protected: + const bool enabled; +public: + inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) + : accumulator(_accumulator), enabled(_enabled) + { + if (likely(enabled)) + startCycles = get_cycles_now(); + else + startCycles = 0; + } + + inline ~LookAheadTimer() + { + if (likely(enabled)) + accumulator.lookAheadCycles += (get_cycles_now() - startCycles); + } +}; + #else struct ActivityTimer { diff --git a/system/jlib/jstatcodes.h b/system/jlib/jstatcodes.h index 1b40a879fe8..21c719eeb8b 100644 --- a/system/jlib/jstatcodes.h +++ b/system/jlib/jstatcodes.h @@ -314,6 +314,8 @@ enum StatisticKind StNumParallelExecute, StNumAgentRequests, StSizeAgentRequests, + StTimeLookAhead, + StCycleLookAheadCycles, StMax, //For any quantity there is potentially the following variants. diff --git a/system/jlib/jstats.cpp b/system/jlib/jstats.cpp index 45e0aa0e09f..d94054fe65f 100644 --- a/system/jlib/jstats.cpp +++ b/system/jlib/jstats.cpp @@ -986,6 +986,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = { { NUMSTAT(ParallelExecute), "The number of parallel execution paths for this activity" }, { NUMSTAT(AgentRequests), "The number of agent request packets for this activity" }, { SIZESTAT(AgentRequests), "The total size of agent request packets for this activity" }, + { TIMESTAT(LookAhead), "The total time lookahead thread spend prefetching rows from upstream activities" }, + { CYCLESTAT(LookAhead) }, }; static MapStringTo statisticNameMap(true); diff --git a/thorlcr/activities/nsplitter/thnsplitterslave.cpp b/thorlcr/activities/nsplitter/thnsplitterslave.cpp index d733aa41a64..958fea57a04 100644 --- a/thorlcr/activities/nsplitter/thnsplitterslave.cpp +++ b/thorlcr/activities/nsplitter/thnsplitterslave.cpp @@ -60,6 +60,7 @@ class CSplitterOutput : public CSimpleInterfaceOf, pu virtual IStrandJunction *getOutputStreams(CActivityBase &ctx, unsigned idx, PointerArrayOf &streams, const CThorStrandOptions * consumerOptions, bool consumerOrdered, IOrderedCallbackCollection * orderedCallbacks) override; virtual unsigned __int64 queryTotalCycles() const override { return COutputTiming::queryTotalCycles(); } virtual unsigned __int64 queryEndCycles() const override { return COutputTiming::queryEndCycles(); } + virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); } virtual void debugRequest(MemoryBuffer &mb) override; // Stepping methods virtual IInputSteppingMeta *querySteppingMeta() { return nullptr; } diff --git a/thorlcr/activities/thactivityutil.cpp b/thorlcr/activities/thactivityutil.cpp index afa10f9b282..4aee196d999 100644 --- a/thorlcr/activities/thactivityutil.cpp +++ b/thorlcr/activities/thactivityutil.cpp @@ -119,10 +119,17 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { while (requiredLeft&&running) { - OwnedConstThorRow row = inputStream->nextRow(); - if (!row) + OwnedConstThorRow row; { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); row.setown(inputStream->nextRow()); + } + if (!row) + { + { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(inputStream->nextRow()); + } if (!row) break; else @@ -138,7 +145,11 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf { while (requiredLeft&&running) { - OwnedConstThorRow row = inputStream->ungroupedNextRow(); + OwnedConstThorRow row; + { + LookAheadTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(inputStream->ungroupedNextRow()); + } if (!row) break; ++count; @@ -234,7 +245,15 @@ class CRowStreamLookAhead : public CSimpleInterfaceOf // IEngineRowStream virtual const void *nextRow() override { - OwnedConstThorRow row = smartbuf->nextRow(); + OwnedConstThorRow row; + { + // smartbuf->nextRow should return immediately if a row is available. + // smartbuf->nextRow will take time if blocked, so record time taken as blocked time. + // N.b. smartbuf->next may take a trivial amount of time if row is available but + // for the purposes of stats this will still be considered blocked. + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); + row.setown(smartbuf->nextRow()); + } if (getexception) throw getexception.getClear(); if (!row) diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index e39c45909d5..35facee097c 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -586,14 +586,14 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const break; } } - unsigned __int64 localCycles = queryTotalCycles(); - if (localCycles < inputCycles) // not sure how/if possible, but guard against + unsigned __int64 processCycles = queryTotalCycles() + queryLookAheadCycles(); + if (processCycles < inputCycles) // not sure how/if possible, but guard against return 0; - localCycles -= inputCycles; + processCycles -= inputCycles; const unsigned __int64 blockedCycles = queryBlockedCycles(); - if (localCycles < blockedCycles) + if (processCycles < blockedCycles) return 0; - return localCycles-blockedCycles; + return processCycles-blockedCycles; } void CSlaveActivity::serializeStats(MemoryBuffer &mb) @@ -618,6 +618,8 @@ void CSlaveActivity::serializeStats(MemoryBuffer &mb) serializedStats.setStatistic(StTimeLocalExecute, (unsigned __int64)cycle_to_nanosec(queryLocalCycles())); serializedStats.setStatistic(StTimeTotalExecute, (unsigned __int64)cycle_to_nanosec(queryTotalCycles())); serializedStats.setStatistic(StTimeBlocked, (unsigned __int64)cycle_to_nanosec(queryBlockedCycles())); + serializedStats.setStatistic(StTimeLookAhead, (unsigned __int64)cycle_to_nanosec(queryLookAheadCycles())); + serializedStats.serialize(mb); ForEachItemIn(i, outputs) { diff --git a/thorlcr/graph/thgraphslave.hpp b/thorlcr/graph/thgraphslave.hpp index 0d758113c51..0695c83d9a8 100644 --- a/thorlcr/graph/thgraphslave.hpp +++ b/thorlcr/graph/thgraphslave.hpp @@ -46,10 +46,11 @@ class COutputTiming COutputTiming() { } void resetTiming() { slaveTimerStats.reset(); } - ActivityTimeAccumulator &getTotalCyclesRef() { return slaveTimerStats; } + ActivityTimeAccumulator &getActivityTimerAccumulator() { return slaveTimerStats; } unsigned __int64 queryTotalCycles() const { return slaveTimerStats.totalCycles; } unsigned __int64 queryEndCycles() const { return slaveTimerStats.endCycles; } unsigned __int64 queryBlockedCycles() const { return slaveTimerStats.blockedCycles; } + unsigned __int64 queryLookAheadCycles() const { return slaveTimerStats.lookAheadCycles; } }; class CEdgeProgress @@ -289,8 +290,9 @@ class graphslave_decl CSlaveActivity : public CActivityBase, public CEdgeProgres return consumerOrdered; } virtual unsigned __int64 queryTotalCycles() const { return COutputTiming::queryTotalCycles(); } - virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles();} + virtual unsigned __int64 queryBlockedCycles() const { return COutputTiming::queryBlockedCycles(); } virtual unsigned __int64 queryEndCycles() const { return COutputTiming::queryEndCycles(); } + virtual unsigned __int64 queryLookAheadCycles() const { return COutputTiming::queryLookAheadCycles(); } virtual void debugRequest(MemoryBuffer &msg) override; // IThorDataLink diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 25e62473115..275669abfdc 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -76,7 +76,7 @@ static Owned ClusterMPAllocator; const StatisticsMapping spillStatistics({StTimeSpillElapsed, StTimeSortElapsed, StNumSpills, StSizeSpillFile, StSizePeakTempDisk}); const StatisticsMapping executeStatistics({StTimeTotalExecute, StTimeLocalExecute, StTimeBlocked}); const StatisticsMapping soapcallStatistics({StTimeSoapcall}); -const StatisticsMapping basicActivityStatistics({StNumParallelExecute}, executeStatistics, spillStatistics); +const StatisticsMapping basicActivityStatistics({StNumParallelExecute, StTimeLookAhead}, executeStatistics, spillStatistics); const StatisticsMapping groupActivityStatistics({StNumGroups, StNumGroupMax}, basicActivityStatistics); const StatisticsMapping indexReadFileStatistics({}, diskReadRemoteStatistics, jhtreeCacheStatistics); const StatisticsMapping indexReadActivityStatistics({StNumRowsProcessed}, indexReadFileStatistics, basicActivityStatistics); From 1f17a0f8499fbf5c5696974a1f6acedccb371b8d Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Mon, 16 Sep 2024 16:29:06 +0100 Subject: [PATCH 2/3] HPCC-32479 Changes following review Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 51 +++++++------------------------- thorlcr/graph/thgraphslave.cpp | 3 ++ 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index eff2964464a..c61ed99f218 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -246,7 +246,7 @@ class THORHELPER_API ActivityTimeAccumulator unsigned __int64 firstRow; // Timestamp of first row (nanoseconds since epoch) cycle_t firstExitCycles; // Wall clock time of first exit from this activity cycle_t blockedCycles; // Time spent blocked - cycle_t lookAheadCycles; + cycle_t lookAheadCycles; // Time spent by lookahead thread // Return the total amount of time (in nanoseconds) spent in this activity (first entry to last exit) inline unsigned __int64 elapsed() const { return cycle_to_nanosec(endCycles-startCycles); } @@ -338,53 +338,18 @@ class SimpleActivityTimer } }; -class BlockedActivityTimer +class BlockedActivityTimer : public SimpleActivityTimer { - unsigned __int64 startCycles; - ActivityTimeAccumulator &accumulator; -protected: - const bool enabled; public: BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) - : accumulator(_accumulator), enabled(_enabled) - { - if (enabled) - startCycles = get_cycles_now(); - else - startCycles = 0; - } - - ~BlockedActivityTimer() - { - if (enabled) - { - cycle_t elapsedCycles = get_cycles_now() - startCycles; - accumulator.blockedCycles += elapsedCycles; - } - } + : SimpleActivityTimer(_accumulator.blockedCycles, _enabled) { } }; -class LookAheadTimer +class LookAheadTimer : public SimpleActivityTimer { - cycle_t startCycles; - ActivityTimeAccumulator &accumulator; -protected: - const bool enabled; public: inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) - : accumulator(_accumulator), enabled(_enabled) - { - if (likely(enabled)) - startCycles = get_cycles_now(); - else - startCycles = 0; - } - - inline ~LookAheadTimer() - { - if (likely(enabled)) - accumulator.lookAheadCycles += (get_cycles_now() - startCycles); - } + : SimpleActivityTimer(_accumulator.lookAheadCycles, _enabled) { } }; #else @@ -399,7 +364,13 @@ struct SimpleActivityTimer struct BlockedActivityTimer { inline BlockedActivityTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled) { } + +}; +struct LookAheadTimer +{ + inline LookAheadTimer(ActivityTimeAccumulator &_accumulator, const bool _enabled){ } }; + #endif class THORHELPER_API IndirectCodeContextEx : public IndirectCodeContext diff --git a/thorlcr/graph/thgraphslave.cpp b/thorlcr/graph/thgraphslave.cpp index 35facee097c..09bf63fd50b 100644 --- a/thorlcr/graph/thgraphslave.cpp +++ b/thorlcr/graph/thgraphslave.cpp @@ -592,7 +592,10 @@ unsigned __int64 CSlaveActivity::queryLocalCycles() const processCycles -= inputCycles; const unsigned __int64 blockedCycles = queryBlockedCycles(); if (processCycles < blockedCycles) + { + IWARNLOG("CSlaveActivity::queryLocalCycles - processCycles %" I64F "u < blockedCycles %" I64F "u", processCycles, blockedCycles); return 0; + } return processCycles-blockedCycles; } From feed3d78dffb87b522c9b3f809212e072546802e Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 27 Sep 2024 16:33:31 +0100 Subject: [PATCH 3/3] HPCC-32480 Capture "look ahead" timings for unordered concat (parallel funnel) Signed-off-by: Shamser Ahmed --- common/thorhelper/thorcommon.hpp | 7 ++++++- thorlcr/activities/funnel/thfunnelslave.cpp | 12 ++++++++++-- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/common/thorhelper/thorcommon.hpp b/common/thorhelper/thorcommon.hpp index c61ed99f218..d5245aaaa9a 100644 --- a/common/thorhelper/thorcommon.hpp +++ b/common/thorhelper/thorcommon.hpp @@ -316,7 +316,7 @@ class SimpleActivityTimer cycle_t startCycles; cycle_t &accumulator; protected: - const bool enabled; + bool enabled; public: inline SimpleActivityTimer(cycle_t &_accumulator, const bool _enabled) : accumulator(_accumulator), enabled(_enabled) @@ -328,12 +328,17 @@ class SimpleActivityTimer } inline ~SimpleActivityTimer() + { + leave(); + } + inline void leave() { if (likely(enabled)) { cycle_t nowCycles = get_cycles_now(); cycle_t elapsedCycles = nowCycles - startCycles; accumulator += elapsedCycles; + enabled = false; } } }; diff --git a/thorlcr/activities/funnel/thfunnelslave.cpp b/thorlcr/activities/funnel/thfunnelslave.cpp index 32962951c52..e2773c3c28d 100644 --- a/thorlcr/activities/funnel/thfunnelslave.cpp +++ b/thorlcr/activities/funnel/thfunnelslave.cpp @@ -85,6 +85,7 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface inputStream = funnel.activity.queryInputStream(inputIndex); while (!stopping) { + LookAheadTimer timer(funnel.activity.getActivityTimerAccumulator(), funnel.activity.queryTimeActivities()); numRows = 0; for (;numRows < chunkSize; numRows++) { @@ -141,14 +142,15 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface Linked serializer; void push(const void *row) - { + { size32_t rowSize = thorRowMemoryFootprint(serializer, row); bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { - + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + timer.leave(); if (stopped) { ReleaseThorRow(row); @@ -179,7 +181,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface bool waitForSpace = false; // only allow a single writer at a time, so only a single thread is waiting on the semaphore - otherwise signal() takes a very long time { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); // will mean first 'push' could block on fullSem, others on this crit. + timer.leave(); if (stopped) { for (unsigned i=0; i < numRows; i++) @@ -266,7 +270,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface } { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); + timer.leave(); stopped = true; // ensure any pending push()'s don't enqueue and if big row potentially block again. if (waiting) { @@ -301,7 +307,9 @@ class CParallelFunnel : implements IRowStream, public CSimpleInterface size32_t sz = thorRowMemoryFootprint(serializer, row.get()); unsigned numToSignal = 0; { + BlockedActivityTimer timer(activity.getActivityTimerAccumulator(), activity.queryTimeActivities()); CriticalBlock b(crit); + timer.leave(); assertex(totSize>=sz); totSize -= sz; if (waiting && (totSize <= FUNNEL_MIN_BUFF_SIZE))