From f25798a07d6fd5a9822291497febb24990ff88a1 Mon Sep 17 00:00:00 2001 From: Shamser Ahmed Date: Fri, 2 Jun 2023 15:37:46 +0100 Subject: [PATCH] HPCC-29634 Aggregate spill stats to graph/wf scope Signed-off-by: Shamser Ahmed --- common/workunit/workunit.cpp | 39 +++++++++++++++++++++++++++++++ common/workunit/workunit.hpp | 1 + ecl/eclagent/eclagent.cpp | 3 ++- thorlcr/master/thgraphmanager.cpp | 2 +- 4 files changed, 43 insertions(+), 2 deletions(-) diff --git a/common/workunit/workunit.cpp b/common/workunit/workunit.cpp index 6546f100e83..577a04332c6 100644 --- a/common/workunit/workunit.cpp +++ b/common/workunit/workunit.cpp @@ -2696,6 +2696,45 @@ cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope) return totalCost; } +void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill) +{ + WuScopeFilter filter; + if (!isEmptyString(scope)) + filter.addScope(scope); + else + { + filter.addScope(""); + filter.addSource("global"); + } + filter.setIncludeNesting(1); + filter.addOutputStatistic(StSizeGraphSpill); + filter.addRequiredStat(StSizeGraphSpill); + filter.finishedFilter(); + Owned it = &wu->getScopeIterator(filter); + peakSizeSpill = 0; + for (it->first(); it->isValid(); ) + { + stat_type value = 0; + if (it->getStat(StSizeGraphSpill, value)) + { + if (value>peakSizeSpill) + peakSizeSpill = value; + it->nextSibling(); + } + else + { + it->next(); + } + } +} + +void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType) +{ + stat_type peakSizeSpill = 0; + gatherSpillSize(wu, scope, peakSizeSpill); + if (peakSizeSpill) + wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax); +} //--------------------------------------------------------------------------------------------------------------------- diff --git a/common/workunit/workunit.hpp b/common/workunit/workunit.hpp index adb79dd614e..73f4921bef0 100644 --- a/common/workunit/workunit.hpp +++ b/common/workunit/workunit.hpp @@ -1722,6 +1722,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeTyp extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search); extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false); extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope); +extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType); extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name); extern WORKUNIT_API void descheduleWorkunit(char const * wuid); #if 0 diff --git a/ecl/eclagent/eclagent.cpp b/ecl/eclagent/eclagent.cpp index 6d513f88983..0b0610c3adb 100644 --- a/ecl/eclagent/eclagent.cpp +++ b/ecl/eclagent/eclagent.cpp @@ -1985,7 +1985,7 @@ void EclAgent::doProcess() const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr); if (diskAccessCost) w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); - + updateSpillSize(w, nullptr, SSTglobal); addTimings(w); switch (w->getState()) @@ -2534,6 +2534,7 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope); if (diskAccessCost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace); + updateSpillSize(wu, scope, SSTworkflow); } void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item) diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 2b76701c9e2..550924cbbcc 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1101,7 +1101,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName, cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines)); if (cost) wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace); - + updateSpillSize(wu, graphScope, SSTgraph); removeJob(*job); } catch (IException *e)