From d9f9f669cf0a8884f1ca29a352a4a208d082cf2c Mon Sep 17 00:00:00 2001 From: John Haddon Date: Fri, 18 Aug 2023 10:01:57 +0100 Subject: [PATCH] CollectTest : Add various TaskCollaboration performance tests --- python/GafferTest/CollectTest.py | 209 +++++++++++++++++++++++++++++++ 1 file changed, 209 insertions(+) diff --git a/python/GafferTest/CollectTest.py b/python/GafferTest/CollectTest.py index 845c1939181..47bc8c48838 100644 --- a/python/GafferTest/CollectTest.py +++ b/python/GafferTest/CollectTest.py @@ -383,5 +383,214 @@ def assertPostconditions() : script.redo() assertPostconditions() + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod() + def testFanOutPerformance( self ) : + + # A network of nested collects that produces a fan-out task pattern, + # computing all permutations of the product of `numCollects` integers from + # the range `[ 0, collectLength )`. + # + # Node graph Compute graph (for `collectLength = 2`) + # ---------------------------------------------------------- + # + # collect0 o oo o + # | \/ \ / + # collect1 o o + # | \ / + # collect2 o + + numCollects = 3 + collectLength = 64 + + script = Gaffer.ScriptNode() + + lastNode = None + for i in range( 0, numCollects ) : + + query = Gaffer.ContextQuery( f"query{i}" ) + query.addQuery( Gaffer.IntPlug(), f"collect{i}:index" ) + script.addChild( query ) + + if lastNode is not None : + + multiply = GafferTest.MultiplyNode( f"multiply{i}" ) + script.addChild( multiply ) + + if isinstance( lastNode, Gaffer.ContextQuery ) : + multiply["op1"].setInput( lastNode["out"][0]["value"] ) + else : + multiply["op1"].setInput( lastNode["product"] ) + multiply["op2"].setInput( query["out"][0]["value"] ) + + lastNode = multiply + + else : + + lastNode = query + + for i in range( 0, numCollects ) : + + collect = Gaffer.Collect( f"collect{i}" ) + script.addChild( collect ) + + collect["contextVariable"].setValue( f"collect{i}:value" ) + collect["indexContextVariable"].setValue( f"collect{i}:index" ) + collect["contextValues"].setValue( IECore.StringVectorData( [ str( j ) for j in range( 0, collectLength ) ] ) ) + + if isinstance( lastNode, GafferTest.MultiplyNode ) : + collect.addInput( Gaffer.IntPlug( "value" ) ).setInput( lastNode["product"] ) + elif isinstance( lastNode["out"][0], Gaffer.IntVectorDataPlug ) : + collect.addInput( Gaffer.IntVectorDataPlug( "value", defaultValue = IECore.IntVectorData() ) ).setInput( lastNode["out"][0] ) + else : + collect.addInput( Gaffer.ObjectVectorPlug( "value", defaultValue = IECore.ObjectVector() ) ).setInput( lastNode["out"][0] ) + + lastNode = collect + + # Measure evaluation performance + + with GafferTest.TestRunner.PerformanceScope() : + result = lastNode["out"]["value"].getValue() + + # Check we were actually computing what we thought we were. + + for i, iData in enumerate( result ) : + for j, jData in enumerate( iData ) : + for k, product in enumerate( jData ) : + self.assertEqual( product, i * j * k ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod( repeat = 1 ) + def testFanOutGatherPerformance( self ) : + + # This node graph just splits and recollects a `vector` + # repeatedly, with the expressions getting a specific index + # determined by the context, and the collects gathering from + # them to rebuild the vector. + # + # Node graph Compute graph (for `collectLength = 2`) + # ---------------------------------------------------------- + # + # expression0 o o + # | \ / + # collect0 o + # | / \ + # expression1 o o + # | \ / + # collect1 o + # | / \ + # expression2 o o + # | \ / + # collect2 o + # + # The primary purpose of this test is to check the performance + # of our cycle-detection code, since it presents huge numbers + # of unique paths through the downstream dependencies of each + # compute. A naive cycle detector could scale incredibly badly + # here. + + numCollects = 10 + collectLength = 64 + + script = Gaffer.ScriptNode() + + lastCollect = None + for i in range( 0, numCollects ) : + + collect = Gaffer.Collect( f"collect{i}" ) + collect["contextValues"].setValue( IECore.StringVectorData( [ str( j ) for j in range( 0, collectLength ) ] ) ) + collect.addInput( Gaffer.IntPlug( "value" ) ).fullName() + script.addChild( collect ) + + ## \todo Ideally we'd have something like an ArrayToScalar node we + # could use instead of Python expressions here, so that there's less + # overhead in the computes themselves and we're more sensitive to + # performance improvements in the underlying collaboration mechanism + # itself. + expression = Gaffer.Expression( f"expression{i}" ) + script.addChild( expression ) + + if lastCollect is None : + expression.setExpression( + f'parent["collect{i}"]["in"]["value"] = context["collect:index"]' + ) + else : + expression.setExpression( + f'index = context["collect:index"]; array = parent["collect{i-1}"]["out"]["value"]; parent["collect{i}"]["in"]["value"] = array[index]' + ) + + lastCollect = collect + + with GafferTest.TestRunner.PerformanceScope() : + result = lastCollect["out"]["value"].getValue() + + self.assertEqual( result, IECore.IntVectorData( range( 0, collectLength ) ) ) + + @GafferTest.TestRunner.CategorisedTestMethod( { "taskCollaboration:performance" } ) + @GafferTest.TestRunner.PerformanceTestMethod( repeat = 1 ) + def testLoop( self ) : + + # `collect1` evaluates the output for `loop` for various iterations + # in parallel. Since each iteration depends on the previous iteration, + # we end up with iteration `n + 1` waiting for an in-flight iteration `n` + # from a different thread, and depending on timings, this can lead to + # increasing-length chains of waiting processes. + # + # Node graph Compute graph (for `maxIterations = 3`) + # ---------------------------------------------------------- + # + # loop <---- o + # | | | \ + # | collect0 | o + # | | | | \ + # |------- | / o + # | |/ / + # | | / + # | | / + # | | / + # | |/ + # collect1 o + + maxIterations = 200 + + script = Gaffer.ScriptNode() + + script["contextQuery"] = Gaffer.ContextQuery() + script["contextQuery"].addQuery( Gaffer.IntPlug(), "collect1:index" ) + + script["loop"] = Gaffer.Loop() + script["loop"].setup( Gaffer.ObjectVectorPlug( defaultValue = IECore.ObjectVector() ) ) + script["loop"]["iterations"].setInput( script["contextQuery"]["out"][0]["value"] ) + + script["collect0"] = Gaffer.Collect() + script["collect0"]["contextValues"].setValue( IECore.StringVectorData( [ "one" ] ) ) + script["collect0"].addInput(script["loop"]["previous"] ) + script["collect0"]["in"][0].setInput( script["loop"]["previous"] ) + script["loop"]["next"].setInput( script["collect0"]["out"][0] ) + + script["collect1"] = Gaffer.Collect() + script["collect1"]["contextValues"].setValue( IECore.StringVectorData( [ str( x ) for x in range( 0, maxIterations ) ] ) ) + script["collect1"]["contextVariable"].setValue( "collect1:value" ) + script["collect1"]["indexContextVariable"].setValue( "collect1:index" ) + script["collect1"].addInput( script["loop"]["out"] ) + script["collect1"]["in"][0].setInput( script["loop"]["out"] ) + + # Measure performance + + with GafferTest.TestRunner.PerformanceScope() : + result = script["collect1"]["out"][0].getValue() + + # Check we were actually computing what we thought + + def depth( v ) : + if len( v ) == 0 : + return 0 + else : + self.assertEqual( len( v ), 1 ) + return depth( v[0] ) + 1 + + for i in range( maxIterations ) : + self.assertEqual( depth( result[i] ), i ) + if __name__ == "__main__": unittest.main()