Skip to content

Commit

Permalink
Merge pull request #5425 from johnhaddon/threadMonitor
Browse files Browse the repository at this point in the history
ThreadMonitor : Add class for monitoring threads used for processes
  • Loading branch information
johnhaddon authored Aug 18, 2023
2 parents 5ce96bc + a5f94ac commit 1ec40b0
Show file tree
Hide file tree
Showing 8 changed files with 438 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ Fixes

- Viewer : Fixed crash when visualising lights with a light filter intended for a different renderer.

API
---

- ThreadMonitor : Added new class for tracking the threads used to perform processes.

Documentation
-------------

Expand Down
108 changes: 108 additions & 0 deletions include/Gaffer/ThreadMonitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
//////////////////////////////////////////////////////////////////////////
//
// Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are
// met:
//
// * Redistributions of source code must retain the above
// copyright notice, this list of conditions and the following
// disclaimer.
//
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided with
// the distribution.
//
// * Neither the name of John Haddon nor the names of
// any other contributors to this software may be used to endorse or
// promote products derived from this software without specific prior
// written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
// IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
// THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
// LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
// NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
//
//////////////////////////////////////////////////////////////////////////

#pragma once

#include "Gaffer/Monitor.h"

#include "tbb/enumerable_thread_specific.h"

#include <unordered_map>

namespace Gaffer
{

IE_CORE_FORWARDDECLARE( Plug )

/// A monitor which collects information about which threads
/// initiated processes on each plug.
class GAFFER_API ThreadMonitor : public Monitor
{

public :

ThreadMonitor( const std::vector<IECore::InternedString> &processMask = { "computeNode:compute" } );
~ThreadMonitor() override;

IE_CORE_DECLAREMEMBERPTR( ThreadMonitor )

/// Numeric identifier for a thread. Using our own identifier rather
/// than `std::thread::id` so that we can bind it to Python (and assign
/// human-readable contiguous values).
using ThreadId = int;
/// Returns the `ThreadId` for the calling thread.
static ThreadId thisThreadId();
/// Maps from `ThreadId` to the number of times a process has been
/// invoked on that thread.
using ProcessesPerThread = std::unordered_map<ThreadId, size_t>;
/// Stores per-thread process counts per-plug.
using PlugMap = std::unordered_map<ConstPlugPtr, ProcessesPerThread>;

/// Query functions. These are not thread-safe, and must be called
/// only when the Monitor is not active (as defined by `Monitor::Scope`).
const PlugMap &allStatistics() const;
const ProcessesPerThread &plugStatistics( const Plug *plug ) const;
const ProcessesPerThread &combinedStatistics() const;

protected :

void processStarted( const Process *process ) override;
void processFinished( const Process *process ) override;

private :

const std::vector<IECore::InternedString> m_processMask;

// We collect statistics into a per-thread data structure to avoid contention.
struct ThreadData
{
ThreadData();
using ProcessesPerPlug = std::unordered_map<ConstPlugPtr, size_t>;
ThreadId id;
ProcessesPerPlug processesPerPlug;
};
mutable tbb::enumerable_thread_specific<ThreadData> m_threadData;

// Then when we want to query it, we collate it into `m_statistics`.
void collate() const;
mutable PlugMap m_statistics;
mutable ProcessesPerThread m_combinedStatistics;

};

IE_CORE_DECLAREPTR( ThreadMonitor )

} // namespace Gaffer
143 changes: 143 additions & 0 deletions python/GafferTest/ThreadMonitorTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
##########################################################################
#
# Copyright (c) 2023, Cinesite VFX Ltd. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are
# met:
#
# * Redistributions of source code must retain the above
# copyright notice, this list of conditions and the following
# disclaimer.
#
# * Redistributions in binary form must reproduce the above
# copyright notice, this list of conditions and the following
# disclaimer in the documentation and/or other materials provided with
# the distribution.
#
# * Neither the name of John Haddon nor the names of
# any other contributors to this software may be used to endorse or
# promote products derived from this software without specific prior
# written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
# IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF
# LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
# SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
##########################################################################

import threading
import unittest

import IECore

import Gaffer
import GafferTest

class ThreadMonitorTest( GafferTest.TestCase ) :

def testConstruction( self ) :

monitor = Gaffer.ThreadMonitor()
self.assertEqual( monitor.allStatistics(), {} )
self.assertEqual( monitor.plugStatistics( Gaffer.IntPlug() ), {} )
self.assertEqual( monitor.combinedStatistics(), {} )

def testThisThreadId( self ) :

id = Gaffer.ThreadMonitor.thisThreadId()
self.assertEqual( id, Gaffer.ThreadMonitor.thisThreadId() )

ids = { id }
lock = threading.Lock()

def storeId() :
id = Gaffer.ThreadMonitor.thisThreadId()
self.assertEqual( id, Gaffer.ThreadMonitor.thisThreadId() )
with lock :
ids.add( id )

threads = []
for i in range( 0, 5 ) :
thread = threading.Thread( target = storeId )
threads.append( thread )
thread.start()

for thread in threads :
thread.join()

self.assertEqual( len( ids ), 6 )

def testMonitoring( self ) :

random = Gaffer.Random()
monitor = Gaffer.ThreadMonitor()

with monitor :
random["outFloat"].getValue()

self.assertEqual(
monitor.allStatistics(),
{
random["outFloat"] : {
monitor.thisThreadId() : 1
}
}
)
self.assertEqual(
monitor.plugStatistics( random["outFloat"] ),
{ monitor.thisThreadId() : 1 }
)
self.assertEqual(
monitor.combinedStatistics(),
{ monitor.thisThreadId() : 1 }
)

random["seedVariable"].setValue( "test" )
with monitor :
GafferTest.parallelGetValue( random["outFloat"], 100000, "test" )

s = monitor.plugStatistics( random["outFloat"] )
self.assertEqual( len( s ), IECore.tbb_global_control.active_value( IECore.tbb_global_control.parameter.max_allowed_parallelism ) )
self.assertEqual( sum( s.values() ), 100001 )

self.assertEqual( monitor.allStatistics(), { random["outFloat"] : s } )
self.assertEqual( monitor.combinedStatistics(), s )

def testProcessMask( self ) :

for processType in [ "computeNode:hash", "computeNode:compute" ] :

with self.subTest( processType = processType ) :

Gaffer.ValuePlug.clearCache()
Gaffer.ValuePlug.clearHashCache()

random = Gaffer.Random()
threadMonitor = Gaffer.ThreadMonitor( processMask = { processType } )
performanceMonitor = Gaffer.PerformanceMonitor()
context = Gaffer.Context()

with threadMonitor, performanceMonitor, context :
for i in range( 0, 5 ) :
context["i"] = i # Unique context to force hashing
random["outFloat"].getValue()

self.assertEqual( performanceMonitor.plugStatistics( random["outFloat"] ).computeCount, 1 )
self.assertEqual( performanceMonitor.plugStatistics( random["outFloat"] ).hashCount, 5 )

self.assertEqual(
sum( threadMonitor.plugStatistics( random["outFloat"] ).values() ),
1 if processType == "computeNode:compute" else 5
)

if __name__ == "__main__":
unittest.main()
1 change: 1 addition & 0 deletions python/GafferTest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def inCI( platforms = set() ) :
from .HiddenFilePathFilterTest import HiddenFilePathFilterTest
from .ContextVariableTweaksTest import ContextVariableTweaksTest
from .OptionalValuePlugTest import OptionalValuePlugTest
from .ThreadMonitorTest import ThreadMonitorTest

from .IECorePreviewTest import *

Expand Down
2 changes: 0 additions & 2 deletions src/Gaffer/PerformanceMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,6 @@

using namespace Gaffer;

/// \todo If we expose ValuePlug::HashProcess and ValuePlug::ComputeProcess
/// then we can use the types defined there directly.
static IECore::InternedString g_hashType( "computeNode:hash" );
static IECore::InternedString g_computeType( "computeNode:compute" );
static PerformanceMonitor::Statistics g_emptyStatistics;
Expand Down
Loading

0 comments on commit 1ec40b0

Please sign in to comment.