Skip to content

Commit

Permalink
Merge pull request #17398 from shamser/issue29634
Browse files Browse the repository at this point in the history
HPCC-29634 Aggregate spill stats to graph/wf scope

Reviewed-by: Jake Smith <[email protected]>
Reviewed-by: Gavin Halliday <[email protected]>
Merged-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday authored Jul 5, 2023
2 parents 906a296 + f25798a commit f1526ab
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 2 deletions.
39 changes: 39 additions & 0 deletions common/workunit/workunit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2696,6 +2696,45 @@ cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope)
return totalCost;
}

void gatherSpillSize(const IConstWorkUnit * wu, const char *scope, stat_type & peakSizeSpill)
{
WuScopeFilter filter;
if (!isEmptyString(scope))
filter.addScope(scope);
else
{
filter.addScope("");
filter.addSource("global");
}
filter.setIncludeNesting(1);
filter.addOutputStatistic(StSizeGraphSpill);
filter.addRequiredStat(StSizeGraphSpill);
filter.finishedFilter();
Owned<IConstWUScopeIterator> it = &wu->getScopeIterator(filter);
peakSizeSpill = 0;
for (it->first(); it->isValid(); )
{
stat_type value = 0;
if (it->getStat(StSizeGraphSpill, value))
{
if (value>peakSizeSpill)
peakSizeSpill = value;
it->nextSibling();
}
else
{
it->next();
}
}
}

void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType)
{
stat_type peakSizeSpill = 0;
gatherSpillSize(wu, scope, peakSizeSpill);
if (peakSizeSpill)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), scopeType, scope, StSizeGraphSpill, nullptr, peakSizeSpill, 1, 0, StatsMergeMax);
}
//---------------------------------------------------------------------------------------------------------------------


Expand Down
1 change: 1 addition & 0 deletions common/workunit/workunit.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1722,6 +1722,7 @@ extern WORKUNIT_API void updateWorkunitTimings(IWorkUnit * wu, StatisticScopeTyp
extern WORKUNIT_API void aggregateStatistic(StatsAggregation & result, IConstWorkUnit * wu, const WuScopeFilter & filter, StatisticKind search);
extern WORKUNIT_API cost_type aggregateCost(const IConstWorkUnit * wu, const char *scope=nullptr, bool excludehThor=false);
extern WORKUNIT_API cost_type aggregateDiskAccessCost(const IConstWorkUnit * wu, const char *scope);
extern WORKUNIT_API void updateSpillSize(IWorkUnit * wu, const char * scope, StatisticScopeType scopeType);
extern WORKUNIT_API const char *getTargetClusterComponentName(const char *clustname, const char *processType, StringBuffer &name);
extern WORKUNIT_API void descheduleWorkunit(char const * wuid);
#if 0
Expand Down
3 changes: 2 additions & 1 deletion ecl/eclagent/eclagent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ void EclAgent::doProcess()
const cost_type diskAccessCost = aggregateDiskAccessCost(w, nullptr);
if (diskAccessCost)
w->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTglobal, "", StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);

updateSpillSize(w, nullptr, SSTglobal);
addTimings(w);

switch (w->getState())
Expand Down Expand Up @@ -2534,6 +2534,7 @@ void EclAgentWorkflowMachine::noteTiming(unsigned wfid, timestamp_type startTime
const cost_type diskAccessCost = aggregateDiskAccessCost(wu, scope);
if (diskAccessCost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTworkflow, scope, StCostFileAccess, NULL, diskAccessCost, 1, 0, StatsMergeReplace);
updateSpillSize(wu, scope, SSTworkflow);
}

void EclAgentWorkflowMachine::doExecutePersistItem(IRuntimeWorkflowItem & item)
Expand Down
2 changes: 1 addition & 1 deletion thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,7 @@ bool CJobManager::executeGraph(IConstWorkUnit &workunit, const char *graphName,
cost_type cost = money2cost_type(calculateThorCost(nanoToMilli(graphTimeNs), numberOfMachines));
if (cost)
wu->setStatistic(queryStatisticsComponentType(), queryStatisticsComponentName(), SSTgraph, graphScope, StCostExecute, NULL, cost, 1, 0, StatsMergeReplace);

updateSpillSize(wu, graphScope, SSTgraph);
removeJob(*job);
}
catch (IException *e)
Expand Down

0 comments on commit f1526ab

Please sign in to comment.