Skip to content

Commit

Permalink
Merge pull request #5444 from johnhaddon/collaborationGroundwork
Browse files Browse the repository at this point in the history
TaskCollaboration groundwork
  • Loading branch information
johnhaddon authored Aug 31, 2023
2 parents 62d6ad1 + 1bff413 commit 5520aa2
Show file tree
Hide file tree
Showing 14 changed files with 336 additions and 9 deletions.
5 changes: 5 additions & 0 deletions Changes.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
1.3.x.x (relative to 1.3.2.0)
=======

API
---

- ThreadState : Added `process()` method.
- Process : Added const overload for `handleException()` method. The non-const version will be removed in future.

1.3.2.0 (relative to 1.3.1.0)
=======
Expand Down
4 changes: 2 additions & 2 deletions include/Gaffer/Process.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ class GAFFER_API Process : private ThreadState::Scope
/// report the error appropriately via Node::errorSignal()
/// and rethrow the exception for propagation back to
/// the original caller.
/// \todo Consider ways of dealing with this automatically - could
/// we use C++11's current_exception() in our destructor perhaps?
[[noreturn]] void handleException() const;
/// \todo This just exists for ABI compatibility. Remove it.
void handleException();

private :
Expand Down
8 changes: 3 additions & 5 deletions include/Gaffer/ThreadState.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ IE_CORE_FORWARDDECLARE( Monitor );
/// Context and Monitor classes. The exception to this is when using
/// task-based TBB algorithms, in which case it is necessary to manually
/// transfer the current ThreadState from the calling code to the tasks
/// running on it's behalf. For example :
/// running on its behalf. For example :
///
/// ```
/// const ThreadState &threadState = ThreadState::current();
Expand Down Expand Up @@ -116,10 +116,8 @@ class GAFFER_API ThreadState

static const ThreadState &current();

const Context *context() const
{
return m_context;
}
const Context *context() const { return m_context; }
const Process *process() const { return m_process; }

private :

Expand Down
14 changes: 14 additions & 0 deletions python/GafferImageTest/ImageStatsTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,20 @@ def testView( self ) :
s["view"].setValue( "right" )
self.assertEqual( s["max"].getValue(), imath.Color4f( 0.5, 0.5, 0.5, 0.875 ) )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } )
def testTaskCollaboration( self ) :

checker = GafferImage.Checkerboard()

stats = GafferImage.ImageStats()
stats["in"].setInput( checker["out"] )
stats["area"].setValue( stats["in"].format().getDisplayWindow() )

with Gaffer.PerformanceMonitor() as pm :
GafferTest.parallelGetValue( stats["average"]["r"], 10000 )

self.assertEqual( pm.plugStatistics( stats["__allStats" ] ).computeCount, 1 )

def __assertColour( self, colour1, colour2 ) :
for i in range( 0, 4 ):
self.assertEqual( "%.4f" % colour2[i], "%.4f" % colour1[i] )
Expand Down
28 changes: 28 additions & 0 deletions python/GafferSceneTest/FilterResultsTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def testOutputIntoExpression( self ) :
] )
)

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:hashAliasing" } )
def testComputeCacheRecursion( self ) :

script = Gaffer.ScriptNode()
Expand Down Expand Up @@ -253,6 +254,33 @@ def testRootMatchVsNoMatch( self ) :
pathFilter["paths"].setValue( IECore.StringVectorData( [ "/" ] ) )
self.assertEqual( filterResults["outStrings"].getValue(), IECore.StringVectorData( [ "/" ] ) )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration" } )
def testTaskCollaboration( self ) :

infiniteScene = GafferScene.ScenePlug()
infiniteScene["childNames"].setValue( IECore.InternedStringVectorData( [ "one", "two" ] ) )

pathFilter = GafferScene.PathFilter()
pathFilter["paths"].setValue( IECore.StringVectorData( [ "/*" * 12 ] ) )

filterResults = GafferScene.FilterResults()
filterResults["scene"].setInput( infiniteScene )
filterResults["filter"].setInput( pathFilter["out"] )

# This checks that we only do a single compute when lots of threads hammer on the same
# result. We repeat it a number of times to increase the chances of exposing bugs that
# allow duplicate computes (we had one during development that only showed up very
# intermittently).

for i in range( 0, 100 ) :

Gaffer.ValuePlug.clearCache()

with Gaffer.PerformanceMonitor() as pm :
GafferTest.parallelGetValue( filterResults["out"], 10000 )

self.assertEqual( pm.plugStatistics( filterResults["__internalOut"] ).computeCount, 1 )

@unittest.skipIf( GafferTest.inCI(), "Performance not relevant on CI platform" )
@GafferTest.TestRunner.PerformanceTestMethod()
def testHashPerformance( self ):
Expand Down
1 change: 1 addition & 0 deletions python/GafferSceneTest/UDIMQueryTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def dictResult():
self.assertNotEqual( initialHash, udimQuery["out"].hash() )
self.assertEqual( dictResult(), {'1001': {'/test': {}}} )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod( repeat = 1)
def testCollaboratePerf( self ) :

Expand Down
209 changes: 209 additions & 0 deletions python/GafferTest/CollectTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,5 +383,214 @@ def assertPostconditions() :
script.redo()
assertPostconditions()

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod()
def testFanOutPerformance( self ) :

# A network of nested collects that produces a fan-out task pattern,
# computing all permutations of the product of `numCollects` integers from
# the range `[ 0, collectLength )`.
#
# Node graph Compute graph (for `collectLength = 2`)
# ----------------------------------------------------------
#
# collect0 o oo o
# | \/ \ /
# collect1 o o
# | \ /
# collect2 o

numCollects = 3
collectLength = 64

script = Gaffer.ScriptNode()

lastNode = None
for i in range( 0, numCollects ) :

query = Gaffer.ContextQuery( f"query{i}" )
query.addQuery( Gaffer.IntPlug(), f"collect{i}:index" )
script.addChild( query )

if lastNode is not None :

multiply = GafferTest.MultiplyNode( f"multiply{i}" )
script.addChild( multiply )

if isinstance( lastNode, Gaffer.ContextQuery ) :
multiply["op1"].setInput( lastNode["out"][0]["value"] )
else :
multiply["op1"].setInput( lastNode["product"] )
multiply["op2"].setInput( query["out"][0]["value"] )

lastNode = multiply

else :

lastNode = query

for i in range( 0, numCollects ) :

collect = Gaffer.Collect( f"collect{i}" )
script.addChild( collect )

collect["contextVariable"].setValue( f"collect{i}:value" )
collect["indexContextVariable"].setValue( f"collect{i}:index" )
collect["contextValues"].setValue( IECore.StringVectorData( [ str( j ) for j in range( 0, collectLength ) ] ) )

if isinstance( lastNode, GafferTest.MultiplyNode ) :
collect.addInput( Gaffer.IntPlug( "value" ) ).setInput( lastNode["product"] )
elif isinstance( lastNode["out"][0], Gaffer.IntVectorDataPlug ) :
collect.addInput( Gaffer.IntVectorDataPlug( "value", defaultValue = IECore.IntVectorData() ) ).setInput( lastNode["out"][0] )
else :
collect.addInput( Gaffer.ObjectVectorPlug( "value", defaultValue = IECore.ObjectVector() ) ).setInput( lastNode["out"][0] )

lastNode = collect

# Measure evaluation performance

with GafferTest.TestRunner.PerformanceScope() :
result = lastNode["out"]["value"].getValue()

# Check we were actually computing what we thought we were.

for i, iData in enumerate( result ) :
for j, jData in enumerate( iData ) :
for k, product in enumerate( jData ) :
self.assertEqual( product, i * j * k )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod( repeat = 1 )
def testFanOutGatherPerformance( self ) :

# This node graph just splits and recollects a `vector<int>`
# repeatedly, with the expressions getting a specific index
# determined by the context, and the collects gathering from
# them to rebuild the vector.
#
# Node graph Compute graph (for `collectLength = 2`)
# ----------------------------------------------------------
#
# expression0 o o
# | \ /
# collect0 o
# | / \
# expression1 o o
# | \ /
# collect1 o
# | / \
# expression2 o o
# | \ /
# collect2 o
#
# The primary purpose of this test is to check the performance
# of our cycle-detection code, since it presents huge numbers
# of unique paths through the downstream dependencies of each
# compute. A naive cycle detector could scale incredibly badly
# here.

numCollects = 10
collectLength = 64

script = Gaffer.ScriptNode()

lastCollect = None
for i in range( 0, numCollects ) :

collect = Gaffer.Collect( f"collect{i}" )
collect["contextValues"].setValue( IECore.StringVectorData( [ str( j ) for j in range( 0, collectLength ) ] ) )
collect.addInput( Gaffer.IntPlug( "value" ) ).fullName()
script.addChild( collect )

## \todo Ideally we'd have something like an ArrayToScalar node we
# could use instead of Python expressions here, so that there's less
# overhead in the computes themselves and we're more sensitive to
# performance improvements in the underlying collaboration mechanism
# itself.
expression = Gaffer.Expression( f"expression{i}" )
script.addChild( expression )

if lastCollect is None :
expression.setExpression(
f'parent["collect{i}"]["in"]["value"] = context["collect:index"]'
)
else :
expression.setExpression(
f'index = context["collect:index"]; array = parent["collect{i-1}"]["out"]["value"]; parent["collect{i}"]["in"]["value"] = array[index]'
)

lastCollect = collect

with GafferTest.TestRunner.PerformanceScope() :
result = lastCollect["out"]["value"].getValue()

self.assertEqual( result, IECore.IntVectorData( range( 0, collectLength ) ) )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod( repeat = 1 )
def testLoop( self ) :

# `collect1` evaluates the output for `loop` for various iterations
# in parallel. Since each iteration depends on the previous iteration,
# we end up with iteration `n + 1` waiting for an in-flight iteration `n`
# from a different thread, and depending on timings, this can lead to
# increasing-length chains of waiting processes.
#
# Node graph Compute graph (for `maxIterations = 3`)
# ----------------------------------------------------------
#
# loop <---- o
# | | | \
# | collect0 | o
# | | | | \
# |------- | / o
# | |/ /
# | | /
# | | /
# | | /
# | |/
# collect1 o

maxIterations = 200

script = Gaffer.ScriptNode()

script["contextQuery"] = Gaffer.ContextQuery()
script["contextQuery"].addQuery( Gaffer.IntPlug(), "collect1:index" )

script["loop"] = Gaffer.Loop()
script["loop"].setup( Gaffer.ObjectVectorPlug( defaultValue = IECore.ObjectVector() ) )
script["loop"]["iterations"].setInput( script["contextQuery"]["out"][0]["value"] )

script["collect0"] = Gaffer.Collect()
script["collect0"]["contextValues"].setValue( IECore.StringVectorData( [ "one" ] ) )
script["collect0"].addInput(script["loop"]["previous"] )
script["collect0"]["in"][0].setInput( script["loop"]["previous"] )
script["loop"]["next"].setInput( script["collect0"]["out"][0] )

script["collect1"] = Gaffer.Collect()
script["collect1"]["contextValues"].setValue( IECore.StringVectorData( [ str( x ) for x in range( 0, maxIterations ) ] ) )
script["collect1"]["contextVariable"].setValue( "collect1:value" )
script["collect1"]["indexContextVariable"].setValue( "collect1:index" )
script["collect1"].addInput( script["loop"]["out"] )
script["collect1"]["in"][0].setInput( script["loop"]["out"] )

# Measure performance

with GafferTest.TestRunner.PerformanceScope() :
result = script["collect1"]["out"][0].getValue()

# Check we were actually computing what we thought

def depth( v ) :
if len( v ) == 0 :
return 0
else :
self.assertEqual( len( v ), 1 )
return depth( v[0] ) + 1

for i in range( maxIterations ) :
self.assertEqual( depth( result[i] ), i )

if __name__ == "__main__":
unittest.main()
13 changes: 13 additions & 0 deletions python/GafferTest/ComputeNodeTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,5 +749,18 @@ def testInterleavedEditsAndComputes( self ) :
n["op1"].setValue( i )
self.assertEqual( n["sum"].getValue(), i )

def testCacheSharedBetweenNodes( self ) :

n1 = GafferTest.AddNode()
n2 = GafferTest.AddNode()

with Gaffer.PerformanceMonitor() as pm :

self.assertEqual( n1["sum"].getValue(), 0 )
self.assertEqual( n2["sum"].getValue(), 0 )

self.assertEqual( pm.plugStatistics( n1["sum"] ).computeCount, 1 )
self.assertEqual( pm.plugStatistics( n2["sum"] ).computeCount, 0 )

if __name__ == "__main__":
unittest.main()
22 changes: 22 additions & 0 deletions python/GafferTest/ExpressionTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -1513,6 +1513,7 @@ def testNoneOutput( self ) :
)

@GafferTest.TestRunner.PerformanceTestMethod()
@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
def testParallelPerformance( self ):
s = Gaffer.ScriptNode()
s["n"] = Gaffer.Node()
Expand All @@ -1531,6 +1532,27 @@ def testParallelPerformance( self ):
with GafferTest.TestRunner.PerformanceScope() :
GafferTest.parallelGetValue( s["n"]["user"]["p"], 100 )

def testParallelGetValueComputesOnce( self ) :

s = Gaffer.ScriptNode()

s["n"] = Gaffer.Node()
s["n"]["user"]["p"] = Gaffer.IntPlug( flags = Gaffer.Plug.Flags.Default | Gaffer.Plug.Flags.Dynamic )

s["e"] = Gaffer.Expression()
s["e"].setExpression( inspect.cleandoc(
"""
import time
time.sleep( 0.1 )
parent["n"]["user"]["p"] = 0
"""
) )

with Gaffer.PerformanceMonitor() as pm :
GafferTest.parallelGetValue( s["n"]["user"]["p"], 10000 )

self.assertEqual( pm.plugStatistics( s["e"]["__execute"] ).computeCount, 1 )

def testRemoveDrivenSpreadsheetRow( self ) :

s = Gaffer.ScriptNode()
Expand Down
Loading

0 comments on commit 5520aa2

Please sign in to comment.