Skip to content

Commit

Permalink
CollectTest : Add various TaskCollaboration performance tests
Browse files Browse the repository at this point in the history
  • Loading branch information
johnhaddon committed Aug 31, 2023
1 parent e26da05 commit d9f9f66
Showing 1 changed file with 209 additions and 0 deletions.
209 changes: 209 additions & 0 deletions python/GafferTest/CollectTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -383,5 +383,214 @@ def assertPostconditions() :
script.redo()
assertPostconditions()

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod()
def testFanOutPerformance( self ) :

# A network of nested collects that produces a fan-out task pattern,
# computing all permutations of the product of `numCollects` integers from
# the range `[ 0, collectLength )`.
#
# Node graph Compute graph (for `collectLength = 2`)
# ----------------------------------------------------------
#
# collect0 o oo o
# | \/ \ /
# collect1 o o
# | \ /
# collect2 o

numCollects = 3
collectLength = 64

script = Gaffer.ScriptNode()

lastNode = None
for i in range( 0, numCollects ) :

query = Gaffer.ContextQuery( f"query{i}" )
query.addQuery( Gaffer.IntPlug(), f"collect{i}:index" )
script.addChild( query )

if lastNode is not None :

multiply = GafferTest.MultiplyNode( f"multiply{i}" )
script.addChild( multiply )

if isinstance( lastNode, Gaffer.ContextQuery ) :
multiply["op1"].setInput( lastNode["out"][0]["value"] )
else :
multiply["op1"].setInput( lastNode["product"] )
multiply["op2"].setInput( query["out"][0]["value"] )

lastNode = multiply

else :

lastNode = query

for i in range( 0, numCollects ) :

collect = Gaffer.Collect( f"collect{i}" )
script.addChild( collect )

collect["contextVariable"].setValue( f"collect{i}:value" )
collect["indexContextVariable"].setValue( f"collect{i}:index" )
collect["contextValues"].setValue( IECore.StringVectorData( [ str( j ) for j in range( 0, collectLength ) ] ) )

if isinstance( lastNode, GafferTest.MultiplyNode ) :
collect.addInput( Gaffer.IntPlug( "value" ) ).setInput( lastNode["product"] )
elif isinstance( lastNode["out"][0], Gaffer.IntVectorDataPlug ) :
collect.addInput( Gaffer.IntVectorDataPlug( "value", defaultValue = IECore.IntVectorData() ) ).setInput( lastNode["out"][0] )
else :
collect.addInput( Gaffer.ObjectVectorPlug( "value", defaultValue = IECore.ObjectVector() ) ).setInput( lastNode["out"][0] )

lastNode = collect

# Measure evaluation performance

with GafferTest.TestRunner.PerformanceScope() :
result = lastNode["out"]["value"].getValue()

# Check we were actually computing what we thought we were.

for i, iData in enumerate( result ) :
for j, jData in enumerate( iData ) :
for k, product in enumerate( jData ) :
self.assertEqual( product, i * j * k )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod( repeat = 1 )
def testFanOutGatherPerformance( self ) :

# This node graph just splits and recollects a `vector<int>`
# repeatedly, with the expressions getting a specific index
# determined by the context, and the collects gathering from
# them to rebuild the vector.
#
# Node graph Compute graph (for `collectLength = 2`)
# ----------------------------------------------------------
#
# expression0 o o
# | \ /
# collect0 o
# | / \
# expression1 o o
# | \ /
# collect1 o
# | / \
# expression2 o o
# | \ /
# collect2 o
#
# The primary purpose of this test is to check the performance
# of our cycle-detection code, since it presents huge numbers
# of unique paths through the downstream dependencies of each
# compute. A naive cycle detector could scale incredibly badly
# here.

numCollects = 10
collectLength = 64

script = Gaffer.ScriptNode()

lastCollect = None
for i in range( 0, numCollects ) :

collect = Gaffer.Collect( f"collect{i}" )
collect["contextValues"].setValue( IECore.StringVectorData( [ str( j ) for j in range( 0, collectLength ) ] ) )
collect.addInput( Gaffer.IntPlug( "value" ) ).fullName()
script.addChild( collect )

## \todo Ideally we'd have something like an ArrayToScalar node we
# could use instead of Python expressions here, so that there's less
# overhead in the computes themselves and we're more sensitive to
# performance improvements in the underlying collaboration mechanism
# itself.
expression = Gaffer.Expression( f"expression{i}" )
script.addChild( expression )

if lastCollect is None :
expression.setExpression(
f'parent["collect{i}"]["in"]["value"] = context["collect:index"]'
)
else :
expression.setExpression(
f'index = context["collect:index"]; array = parent["collect{i-1}"]["out"]["value"]; parent["collect{i}"]["in"]["value"] = array[index]'
)

lastCollect = collect

with GafferTest.TestRunner.PerformanceScope() :
result = lastCollect["out"]["value"].getValue()

self.assertEqual( result, IECore.IntVectorData( range( 0, collectLength ) ) )

@GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } )
@GafferTest.TestRunner.PerformanceTestMethod( repeat = 1 )
def testLoop( self ) :

# `collect1` evaluates the output for `loop` for various iterations
# in parallel. Since each iteration depends on the previous iteration,
# we end up with iteration `n + 1` waiting for an in-flight iteration `n`
# from a different thread, and depending on timings, this can lead to
# increasing-length chains of waiting processes.
#
# Node graph Compute graph (for `maxIterations = 3`)
# ----------------------------------------------------------
#
# loop <---- o
# | | | \
# | collect0 | o
# | | | | \
# |------- | / o
# | |/ /
# | | /
# | | /
# | | /
# | |/
# collect1 o

maxIterations = 200

script = Gaffer.ScriptNode()

script["contextQuery"] = Gaffer.ContextQuery()
script["contextQuery"].addQuery( Gaffer.IntPlug(), "collect1:index" )

script["loop"] = Gaffer.Loop()
script["loop"].setup( Gaffer.ObjectVectorPlug( defaultValue = IECore.ObjectVector() ) )
script["loop"]["iterations"].setInput( script["contextQuery"]["out"][0]["value"] )

script["collect0"] = Gaffer.Collect()
script["collect0"]["contextValues"].setValue( IECore.StringVectorData( [ "one" ] ) )
script["collect0"].addInput(script["loop"]["previous"] )
script["collect0"]["in"][0].setInput( script["loop"]["previous"] )
script["loop"]["next"].setInput( script["collect0"]["out"][0] )

script["collect1"] = Gaffer.Collect()
script["collect1"]["contextValues"].setValue( IECore.StringVectorData( [ str( x ) for x in range( 0, maxIterations ) ] ) )
script["collect1"]["contextVariable"].setValue( "collect1:value" )
script["collect1"]["indexContextVariable"].setValue( "collect1:index" )
script["collect1"].addInput( script["loop"]["out"] )
script["collect1"]["in"][0].setInput( script["loop"]["out"] )

# Measure performance

with GafferTest.TestRunner.PerformanceScope() :
result = script["collect1"]["out"][0].getValue()

# Check we were actually computing what we thought

def depth( v ) :
if len( v ) == 0 :
return 0
else :
self.assertEqual( len( v ), 1 )
return depth( v[0] ) + 1

for i in range( maxIterations ) :
self.assertEqual( depth( result[i] ), i )

if __name__ == "__main__":
unittest.main()

0 comments on commit d9f9f66

Please sign in to comment.