diff --git a/.gitignore b/.gitignore index 41f03c02..781a8710 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .DS_Store **/.gradle/ **/build/ +**/bin/ ## From https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore diff --git a/krystex/src/main/java/com/flipkart/krystal/krystex/LogicDefinitionRegistry.java b/krystex/src/main/java/com/flipkart/krystal/krystex/LogicDefinitionRegistry.java index da4478bd..432665e9 100644 --- a/krystex/src/main/java/com/flipkart/krystal/krystex/LogicDefinitionRegistry.java +++ b/krystex/src/main/java/com/flipkart/krystal/krystex/LogicDefinitionRegistry.java @@ -3,17 +3,17 @@ import com.flipkart.krystal.krystex.kryon.KryonLogicId; import com.flipkart.krystal.krystex.resolution.MultiResolver; import com.flipkart.krystal.krystex.resolution.ResolverLogic; -import java.util.HashMap; import java.util.Map; import java.util.NoSuchElementException; +import java.util.concurrent.ConcurrentHashMap; public final class LogicDefinitionRegistry { private final Map> outputLogicDefinitions = - new HashMap<>(); + new ConcurrentHashMap<>(); private final Map> resolverLogicDefinitions = - new HashMap<>(); + new ConcurrentHashMap<>(); private final Map> multiResolverDefinitions = - new HashMap<>(); + new ConcurrentHashMap<>(); @SuppressWarnings("unchecked") public OutputLogicDefinition getOutputLogic(KryonLogicId kryonLogicId) { diff --git a/krystex/src/main/java/com/flipkart/krystal/krystex/kryon/KryonDefinitionRegistry.java b/krystex/src/main/java/com/flipkart/krystal/krystex/kryon/KryonDefinitionRegistry.java index 1c99f975..6faddf44 100644 --- a/krystex/src/main/java/com/flipkart/krystal/krystex/kryon/KryonDefinitionRegistry.java +++ b/krystex/src/main/java/com/flipkart/krystal/krystex/kryon/KryonDefinitionRegistry.java @@ -10,14 +10,14 @@ import com.flipkart.krystal.krystex.resolution.Resolver; import com.flipkart.krystal.tags.ElementTags; import com.google.common.collect.ImmutableMap; -import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; public final class KryonDefinitionRegistry { private final LogicDefinitionRegistry logicDefinitionRegistry; - private final Map kryonDefinitions = new LinkedHashMap<>(); + private final Map kryonDefinitions = new ConcurrentHashMap<>(); private final DependantChainStart dependantChainStart = new DependantChainStart(); public KryonDefinitionRegistry(LogicDefinitionRegistry logicDefinitionRegistry) { diff --git a/vajram/vajram-krystex/src/main/java/com/flipkart/krystal/vajramexecutor/krystex/VajramKryonGraph.java b/vajram/vajram-krystex/src/main/java/com/flipkart/krystal/vajramexecutor/krystex/VajramKryonGraph.java index cd4b55b7..b2241e5a 100644 --- a/vajram/vajram-krystex/src/main/java/com/flipkart/krystal/vajramexecutor/krystex/VajramKryonGraph.java +++ b/vajram/vajram-krystex/src/main/java/com/flipkart/krystal/vajramexecutor/krystex/VajramKryonGraph.java @@ -81,12 +81,16 @@ public final class VajramKryonGraph implements VajramExecutableGraph vajramDefinitions = new LinkedHashMap<>(); - private final ConcurrentHashMap>, VajramDefinition> vajramDataByClass = + private final Map vajramDefinitions = new ConcurrentHashMap<>(); + private final Map>, VajramDefinition> vajramDataByClass = new ConcurrentHashMap<>(); - /** These are those call graphs of a vajram where no other vajram depends on this. */ - private final Map vajramExecutables = new LinkedHashMap<>(); + /** + * Maps every vajramId to its corresponding kryonId all of whose dependencies have also been + * loaded recursively. The mapped kryon id represents the complete executable sub-graph of the + * vajram. + */ + private final Map vajramExecutables = new ConcurrentHashMap<>(); /** LogicDecorator Id -> LogicDecoratorConfig */ private final ImmutableMap sessionScopedDecoratorConfigs; @@ -171,7 +175,7 @@ public void registerInputBatchers(VajramID vajramID, InputBatcherConfig... input */ public DependantChain computeDependantChain( String firstVajramId, Dependency firstDependencyId, Dependency... subsequentDependencyIds) { - KryonId firstKryonId = _getVajramExecutionGraph(vajramID(firstVajramId)); + KryonId firstKryonId = getKryonId(vajramID(firstVajramId)); KryonDefinition currentKryon = kryonDefinitionRegistry.get(firstKryonId); DependantChain currentDepChain = kryonDefinitionRegistry.getDependantChainsStart().extend(firstKryonId, firstDependencyId); @@ -221,57 +225,65 @@ private void registerVajram(Vajram vajram) { * @return {@link KryonId} of the {@link KryonDefinition} corresponding to this given vajramId */ KryonId getKryonId(VajramID vajramId) { - return _getVajramExecutionGraph(vajramId); - } - - private KryonId _getVajramExecutionGraph(VajramID vajramId) { KryonId kryonId = vajramExecutables.get(vajramId); if (kryonId != null) { return kryonId; } else { + return loadKryonSubgraph(vajramId, new LinkedHashMap<>()); + } + } + + private KryonId loadKryonSubgraph(VajramID vajramId, Map loadingInProgress) { + synchronized (vajramExecutables) { + KryonId kryonId; + if ((kryonId = vajramExecutables.get(vajramId)) != null) { + // This means the subgraph is already loaded. + return kryonId; + } else if ((kryonId = loadingInProgress.get(vajramId)) != null) { + // This means the subgraph is still being loaded, but there is a cyclic dependency. Just + // return the kryon to prevent infinite recursion. + return kryonId; + } kryonId = new KryonId(vajramId.vajramId()); + // add to loadingInProgress so that this can be used to prevent infinite recursion in the + // cases where a vajram depends on itself in a cyclic dependency. + loadingInProgress.put(vajramId, kryonId); + VajramDefinition vajramDefinition = + getVajramDefinition(vajramId) + .orElseThrow( + () -> + new NoSuchElementException( + "Could not find vajram with id: %s".formatted(vajramId))); + InputResolverCreationResult inputResolverCreationResult = + createKryonLogicsForInputResolvers(vajramDefinition); + ImmutableMap depIdToProviderKryon = + createKryonDefinitionsForDependencies(vajramDefinition, loadingInProgress); + OutputLogicDefinition outputLogicDefinition = + createKryonOutputLogic(kryonId, vajramDefinition); + ImmutableSet inputIds = vajramDefinition.facetSpecs(); + LogicDefinition createNewRequest = + new LogicDefinition<>( + new KryonLogicId(kryonId, "%s:newRequest"), + ImmutableSet.of(), + emptyTags(), + vajramDefinition.vajram()::newRequestBuilder); + KryonDefinition kryonDefinition = + kryonDefinitionRegistry.newKryonDefinition( + kryonId.value(), + inputIds, + outputLogicDefinition.kryonLogicId(), + depIdToProviderKryon, + inputResolverCreationResult.resolversByDefinition(), + createNewRequest, + new LogicDefinition<>( + new KryonLogicId(kryonId, "%s:facetsFromRequest"), + ImmutableSet.of(), + emptyTags(), + r -> vajramDefinition.vajram().facetsFromRequest(r)), + vajramDefinition.vajramTags()); + vajramExecutables.put(vajramId, kryonId); + return kryonDefinition.kryonId(); } - vajramExecutables.put(vajramId, kryonId); - - VajramDefinition vajramDefinition = - getVajramDefinition(vajramId) - .orElseThrow( - () -> - new NoSuchElementException( - "Could not find vajram with id: %s".formatted(vajramId))); - - InputResolverCreationResult inputResolverCreationResult = - createKryonLogicsForInputResolvers(vajramDefinition); - - ImmutableMap depIdToProviderKryon = - createKryonDefinitionsForDependencies(vajramDefinition); - - OutputLogicDefinition outputLogicDefinition = - createKryonOutputLogic(kryonId, vajramDefinition); - - ImmutableSet inputIds = vajramDefinition.facetSpecs(); - - LogicDefinition createNewRequest = - new LogicDefinition<>( - new KryonLogicId(kryonId, "%s:newRequest"), - ImmutableSet.of(), - emptyTags(), - vajramDefinition.vajram()::newRequestBuilder); - KryonDefinition kryonDefinition = - kryonDefinitionRegistry.newKryonDefinition( - kryonId.value(), - inputIds, - outputLogicDefinition.kryonLogicId(), - depIdToProviderKryon, - inputResolverCreationResult.resolversByDefinition(), - createNewRequest, - new LogicDefinition<>( - new KryonLogicId(kryonId, "%s:facetsFromRequest"), - ImmutableSet.of(), - emptyTags(), - r -> vajramDefinition.vajram().facetsFromRequest(r)), - vajramDefinition.vajramTags()); - return kryonDefinition.kryonId(); } private InputResolverCreationResult createKryonLogicsForInputResolvers( @@ -448,7 +460,7 @@ private OutputLogicDefinition createKryonOutputLogic( } private ImmutableMap createKryonDefinitionsForDependencies( - VajramDefinition vajramDefinition) { + VajramDefinition vajramDefinition, Map loadingInProgress) { List dependencies = new ArrayList<>(); for (Facet facet : vajramDefinition.facetSpecs()) { if (facet instanceof DependencySpec definition) { @@ -464,7 +476,8 @@ private ImmutableMap createKryonDefinitionsForDependencies( throw new VajramDefinitionException( "Unable to find vajram for vajramId %s".formatted(accessSpec)); } - depIdToProviderKryon.put(dependency, _getVajramExecutionGraph(dependencyVajram.vajramId())); + depIdToProviderKryon.put( + dependency, loadKryonSubgraph(dependencyVajram.vajramId(), loadingInProgress)); } return ImmutableMap.copyOf(depIdToProviderKryon); } diff --git a/vajram/vajram-krystex/src/test/java/com/flipkart/krystal/vajramexecutor/krystex/KrystexVajramExecutorTest.java b/vajram/vajram-krystex/src/test/java/com/flipkart/krystal/vajramexecutor/krystex/KrystexVajramExecutorTest.java index f6a1dde1..35f9f094 100644 --- a/vajram/vajram-krystex/src/test/java/com/flipkart/krystal/vajramexecutor/krystex/KrystexVajramExecutorTest.java +++ b/vajram/vajram-krystex/src/test/java/com/flipkart/krystal/vajramexecutor/krystex/KrystexVajramExecutorTest.java @@ -135,15 +135,14 @@ void tearDown() { @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeCompute_noDependencies_success( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeCompute_noDependencies_success(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath("com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.hello").build(); CompletableFuture result; requestContext.requestId("vajramWithNoDependencies"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId("vajramWithNoDependencies") .build())) { result = @@ -155,15 +154,14 @@ void executeCompute_noDependencies_success( @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeCompute_optionalInputProvided_success( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeCompute_optionalInputProvided_success(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath("com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.hello").build(); CompletableFuture result; requestContext.requestId("vajramWithNoDependencies"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { result = @@ -176,8 +174,7 @@ void executeCompute_optionalInputProvided_success( @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeIo_singleRequestNoBatcher_success( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeIo_singleRequestNoBatcher_success(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath("com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.userservice") .build(); @@ -185,7 +182,7 @@ void executeIo_singleRequestNoBatcher_success( requestContext.requestId("ioVajramSingleRequestNoBatcher"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { userInfo123 = @@ -201,8 +198,7 @@ void executeIo_singleRequestNoBatcher_success( @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeIo_withBatcherMultipleRequests_calledOnlyOnce( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeIo_withBatcherMultipleRequests_calledOnlyOnce(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath( "com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.userservice", @@ -213,7 +209,7 @@ void executeIo_withBatcherMultipleRequests_calledOnlyOnce( requestContext.requestId("ioVajramWithBatcherMultipleRequests"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { helloString = @@ -231,8 +227,7 @@ void executeIo_withBatcherMultipleRequests_calledOnlyOnce( @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeCompute_sequentialDependency_success( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeCompute_sequentialDependency_success(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath( "com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.userservice", @@ -246,7 +241,7 @@ void executeCompute_sequentialDependency_success( requestContext.requestId("sequentialDependency"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { helloString = @@ -262,8 +257,7 @@ void executeCompute_sequentialDependency_success( @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeWithFacets_success( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeWithFacets_success(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath( "com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.userservice", @@ -277,7 +271,7 @@ void executeWithFacets_success( requestContext.requestId("sequentialDependency"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { helloString = @@ -295,15 +289,14 @@ void executeWithFacets_success( @ParameterizedTest @MethodSource("executorConfigsToTest") - void executeCompute_missingMandatoryInput_throwsException( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void executeCompute_missingMandatoryInput_throwsException(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath("com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.hello").build(); CompletableFuture result; requestContext.requestId("vajramWithNoDependencies"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { result = @@ -322,8 +315,7 @@ void executeCompute_missingMandatoryInput_throwsException( @ParameterizedTest @MethodSource("executorConfigsToTest") - void execute_multiRequestNoInputBatcher_cacheHitSuccess( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void execute_multiRequestNoInputBatcher_cacheHitSuccess(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath( "com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.userservice", @@ -334,7 +326,7 @@ void execute_multiRequestNoInputBatcher_cacheHitSuccess( requestContext.requestId("multiRequestNoInputBatcher_cacheHitSuccess"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { userInfo = @@ -360,8 +352,7 @@ void execute_multiRequestNoInputBatcher_cacheHitSuccess( @ParameterizedTest @MethodSource("executorConfigsToTest") - void execute_multiRequestWithBatcher_cacheHitSuccess( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + void execute_multiRequestWithBatcher_cacheHitSuccess(GraphTraversalStrategy graphTraversalStrategy) { graph = loadFromClasspath( "com.flipkart.krystal.vajramexecutor.krystex.test_vajrams.userservice", @@ -372,7 +363,7 @@ void execute_multiRequestWithBatcher_cacheHitSuccess( requestContext.requestId("ioVajramSingleRequestNoBatcher"); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { userInfo = @@ -404,8 +395,7 @@ void execute_multiRequestWithBatcher_cacheHitSuccess( @ParameterizedTest @MethodSource("executorConfigsToTest") - void execute_multiResolverFanouts_permutesTheFanouts( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) + void execute_multiResolverFanouts_permutesTheFanouts(GraphTraversalStrategy graphTraversalStrategy) throws Exception { graph = loadFromClasspath( @@ -427,7 +417,7 @@ void execute_multiResolverFanouts_permutesTheFanouts( .kryonExecutorConfigBuilder( KryonExecutorConfig.builder() .singleThreadExecutor(executorLease.get()) - .kryonExecStrategy(kryonExecStrategy) + .kryonExecStrategy(BATCH) .graphTraversalStrategy(graphTraversalStrategy) .requestScopedLogicDecoratorConfigs( ImmutableMap.of( @@ -462,7 +452,6 @@ Hello Friends of Firstname Lastname (user_id_2)! Firstname Lastname (user_id_2:f @ParameterizedTest @MethodSource("executorConfigsToTest") void flush_singleDepthParallelDependencyDefaultInputBatcherConfig_flushes2Batchers( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { graph = @@ -478,7 +467,7 @@ void flush_singleDepthParallelDependencyDefaultInputBatcherConfig_flushes2Batche requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -508,7 +497,6 @@ Hello Friends of Firstname Lastname (user_id_2)! Firstname Lastname (user_id_2:f @ParameterizedTest @MethodSource("executorConfigsToTest") void flush_singleDepthParallelDependencySharedInputBatcherConfig_flushes1Batcher( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { graph = @@ -522,7 +510,7 @@ void flush_singleDepthParallelDependencySharedInputBatcherConfig_flushes1Batcher requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -553,34 +541,33 @@ Hello Friends of Firstname Lastname (user_id_2)! Firstname Lastname (user_id_2:f 1); } - private KrystexVajramExecutorConfigBuilder getExecutorConfig( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy) { + private KrystexVajramExecutorConfigBuilder getExecutorConfig(GraphTraversalStrategy graphTraversalStrategy) { KryonExecutorConfigBuilder kryonExecutorConfigBuilder = KryonExecutorConfig.builder() - .kryonExecStrategy(kryonExecStrategy) + .kryonExecStrategy(BATCH) .graphTraversalStrategy(graphTraversalStrategy) .logicDecorationOrdering(logicDecorationOrdering); - if (BATCH.equals(kryonExecStrategy)) { - RequestLevelCache requestLevelCache = new RequestLevelCache(); - kryonExecutorConfigBuilder.requestScopedKryonDecoratorConfig( - RequestLevelCache.DECORATOR_TYPE, - new KryonDecoratorConfig( - RequestLevelCache.DECORATOR_TYPE, - executionContext -> true, - executionContext -> RequestLevelCache.DECORATOR_TYPE, - kryonDecoratorContext -> { - return requestLevelCache; - })); - } + + RequestLevelCache requestLevelCache = new RequestLevelCache(); + kryonExecutorConfigBuilder.requestScopedKryonDecoratorConfig( + RequestLevelCache.DECORATOR_TYPE, + new KryonDecoratorConfig( + RequestLevelCache.DECORATOR_TYPE, + executionContext -> true, + executionContext -> RequestLevelCache.DECORATOR_TYPE, + kryonDecoratorContext -> { + return requestLevelCache; + })); + return KrystexVajramExecutorConfig.builder() .kryonExecutorConfigBuilder( kryonExecutorConfigBuilder.singleThreadExecutor(executorLease.get())); } + @ParameterizedTest @MethodSource("executorConfigsToTest") void flush_singleDepthSkipParallelDependencySharedInputBatcherConfig_flushes1Batcher( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { graph = @@ -595,7 +582,7 @@ void flush_singleDepthSkipParallelDependencySharedInputBatcherConfig_flushes1Bat requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -622,7 +609,6 @@ void flush_singleDepthSkipParallelDependencySharedInputBatcherConfig_flushes1Bat @ParameterizedTest @MethodSource("executorConfigsToTest") void close_sequentialDependency_flushesBatcher( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { graph = @@ -642,7 +628,7 @@ void close_sequentialDependency_flushesBatcher( requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -664,7 +650,6 @@ void close_sequentialDependency_flushesBatcher( @ParameterizedTest @MethodSource("executorConfigsToTest") void flush_sequentialDependency_flushesSharedBatchers( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { graph = @@ -679,7 +664,7 @@ void flush_sequentialDependency_flushesSharedBatchers( requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -699,7 +684,6 @@ void flush_sequentialDependency_flushesSharedBatchers( @ParameterizedTest @MethodSource("executorConfigsToTest") void flush_sequentialSkipDependency_flushesSharedBatchers( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { graph = @@ -714,7 +698,7 @@ void flush_sequentialSkipDependency_flushesSharedBatchers( requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -729,7 +713,6 @@ void flush_sequentialSkipDependency_flushesSharedBatchers( @ParameterizedTest @MethodSource("executorConfigsToTest") void flush_skippingADependency_flushesCompleteCallGraph( - KryonExecStrategy kryonExecStrategy, GraphTraversalStrategy graphTraversalStrategy, TestInfo testInfo) { CompletableFuture friendServiceFlushCommand = new CompletableFuture<>(); @@ -797,7 +780,7 @@ public String getId() { requestContext.requestId(testInfo.getDisplayName()); try (KrystexVajramExecutor krystexVajramExecutor = graph.createExecutor( - getExecutorConfig(kryonExecStrategy, graphTraversalStrategy) + getExecutorConfig(graphTraversalStrategy) .requestId(requestContext.requestId()) .build())) { multiHellos = @@ -888,6 +871,6 @@ private static VajramKryonGraphBuilder loadFromClasspath(String... packagePrefix } public static Stream executorConfigsToTest() { - return Stream.of(Arguments.of(BATCH, DEPTH), Arguments.of(BATCH, BREADTH)); + return Stream.of(Arguments.of(DEPTH), Arguments.of(BREADTH)); } } diff --git a/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/FormulaTest.java b/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/FormulaTest.java index 733c1178..d9e727fb 100644 --- a/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/FormulaTest.java +++ b/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/FormulaTest.java @@ -9,9 +9,12 @@ import static com.google.inject.Guice.createInjector; import static com.google.inject.name.Names.named; import static java.time.Duration.ofSeconds; +import static java.util.Arrays.stream; import static java.util.concurrent.CompletableFuture.allOf; import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.runAsync; import static java.util.concurrent.TimeUnit.SECONDS; +import static java.util.stream.IntStream.range; import static org.assertj.core.api.Assertions.assertThat; import com.flipkart.krystal.concurrent.SingleThreadExecutor; @@ -42,20 +45,33 @@ import com.google.inject.AbstractModule; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.LongAdder; import org.checkerframework.checker.nullness.qual.NonNull; -import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; class FormulaTest { + public static final int MAX_THREADS = Runtime.getRuntime().availableProcessors(); + private static final Lease[] EXECUTOR_LEASES = new Lease[MAX_THREADS]; private static SingleThreadExecutorsPool EXEC_POOL; @BeforeAll - static void beforeAll() { - EXEC_POOL = new SingleThreadExecutorsPool("Test", Runtime.getRuntime().availableProcessors()); + static void beforeAll() throws LeaseUnavailableException { + EXEC_POOL = new SingleThreadExecutorsPool("Test", MAX_THREADS); + for (int i = 0; i < MAX_THREADS; i++) { + EXECUTOR_LEASES[i] = EXEC_POOL.lease(); + } + } + + @AfterAll + static void afterAll() { + stream(EXECUTOR_LEASES).forEach(Lease::close); } private VajramKryonGraphBuilder graph; @@ -65,17 +81,12 @@ static void beforeAll() { private Lease executorLease; @BeforeEach - void setUp() throws LeaseUnavailableException { - this.executorLease = EXEC_POOL.lease(); + void setUp() { + this.executorLease = EXECUTOR_LEASES[0]; this.graph = Util.loadFromClasspath(Formula.class.getPackageName()); Adder.CALL_COUNTER.reset(); } - @AfterEach - void tearDown() { - executorLease.close(); - } - @Test void formula_success() { CompletableFuture future; @@ -152,41 +163,126 @@ void formula_ioDepFails_failsWithSameException() { .withMessageContaining("Adder failed because fail flag was set"); } - @Disabled("Long running benchmark (~40s)") - @Test - void millionExecutors_oneCallEach_singleCore_benchmark() throws Exception { + /* + * This test case is designed to catch race condition issues in graph loading + * (Ex: https://github.com/flipkart-incubator/Krystal/issues/328) + */ + @ParameterizedTest + @ValueSource(ints = {1, 2, 4}) // test with different values of parallelism + void parallelExecuteVajrams_success(int executorCount) throws Exception { + int loopCount = 100; + SingleThreadExecutor[] executors = getExecutors(executorCount); + VajramKryonGraph graph = this.graph.build(); + long javaNativeTimeNs = javaMethodBenchmark(FormulaTest::syncFormula, loopCount); + long javaFuturesTimeNs = Util.javaFuturesBenchmark(FormulaTest::asyncFormula, loopCount); + CompletableFuture[] submissionFutures = new CompletableFuture[executorCount]; + @SuppressWarnings("unchecked") + CompletableFuture[] futures = new CompletableFuture[loopCount]; + KryonExecutorMetrics[] metrics = new KryonExecutorMetrics[loopCount]; + LongAdder timeToCreateExecutors = new LongAdder(); + LongAdder timeToEnqueueVajram = new LongAdder(); + long startTime = System.nanoTime(); + int loopCountPerExecutor = loopCount / executorCount; + + for (int currentExecutor : range(0, executorCount).toArray()) { + SingleThreadExecutor executor = executors[currentExecutor]; + int coreCountStart = currentExecutor * loopCountPerExecutor; + submissionFutures[currentExecutor] = + runAsync( + () -> { + FormulaRequestContext requestContext = + new FormulaRequestContext(100, 20, 5, "formulaTest"); + for (int currentLoopCount : + range(coreCountStart, coreCountStart + loopCountPerExecutor).toArray()) { + long iterStartTime = System.nanoTime(); + try (KrystexVajramExecutor krystexVajramExecutor = + graph.createExecutor( + KrystexVajramExecutorConfig.builder() + .kryonExecutorConfigBuilder( + KryonExecutorConfig.builder().singleThreadExecutor(executor)) + .requestId("formulaTest") + .build())) { + timeToCreateExecutors.add(System.nanoTime() - iterStartTime); + metrics[currentLoopCount] = + ((KryonExecutor) krystexVajramExecutor.getKrystalExecutor()) + .getKryonMetrics(); + long enqueueStart = System.nanoTime(); + futures[currentLoopCount] = + executeVajram( + graph, krystexVajramExecutor, currentLoopCount, requestContext); + timeToEnqueueVajram.add(System.nanoTime() - enqueueStart); + } + } + }, + executor); + } + allOf(submissionFutures).join(); + allOf(futures).join(); + long vajramTimeNs = System.nanoTime() - startTime; + assertThat( + allOf(futures) + .whenComplete( + (unused, throwable) -> { + for (int i = 0, futuresLength = futures.length; i < futuresLength; i++) { + CompletableFuture future = futures[i]; + assertThat(future.getNow(0)).isEqualTo((100 + i) / (20 + i + 5 + i)); + } + })) + .succeedsWithin(ofSeconds(1)); + assertThat(Adder.CALL_COUNTER.sum()).isEqualTo(loopCount); + } + + @Disabled("Long running benchmarks. 1 core: ~21sec, 2 cores: ~17sec, 4 cores: ~13sec") + @ParameterizedTest + @ValueSource(ints = {1, 2, 4}) // test with different values of parallelism + void millionExecutors_oneCallEach_NExecutors_benchmark(int executorCount) throws Exception { int loopCount = 1_000_000; - SingleThreadExecutor executor = getExecutors(1)[0]; - VajramKryonGraph graph = - this.graph - // .maxParallelismPerCore(10) - .build(); + SingleThreadExecutor[] executors = getExecutors(executorCount); + VajramKryonGraph graph = this.graph.build(); long javaNativeTimeNs = javaMethodBenchmark(FormulaTest::syncFormula, loopCount); long javaFuturesTimeNs = Util.javaFuturesBenchmark(FormulaTest::asyncFormula, loopCount); + CompletableFuture[] submissionFutures = new CompletableFuture[executorCount]; @SuppressWarnings("unchecked") CompletableFuture[] futures = new CompletableFuture[loopCount]; KryonExecutorMetrics[] metrics = new KryonExecutorMetrics[loopCount]; - long timeToCreateExecutors = 0; - long timeToEnqueueVajram = 0; + LongAdder timeToCreateExecutors = new LongAdder(); + LongAdder timeToEnqueueVajram = new LongAdder(); long startTime = System.nanoTime(); - for (int value = 0; value < loopCount; value++) { - long iterStartTime = System.nanoTime(); - FormulaRequestContext requestContext = new FormulaRequestContext(100, 20, 5, "formulaTest"); - try (KrystexVajramExecutor krystexVajramExecutor = - graph.createExecutor( - KrystexVajramExecutorConfig.builder() - .kryonExecutorConfigBuilder( - KryonExecutorConfig.builder().singleThreadExecutor(executor)) - .requestId("formulaTest") - .build())) { - timeToCreateExecutors += System.nanoTime() - iterStartTime; - metrics[value] = - ((KryonExecutor) krystexVajramExecutor.getKrystalExecutor()).getKryonMetrics(); - long enqueueStart = System.nanoTime(); - futures[value] = executeVajram(graph, krystexVajramExecutor, value, requestContext); - timeToEnqueueVajram += System.nanoTime() - enqueueStart; - } + int loopCountPerExecutor = loopCount / executorCount; + + for (int currentExecutor : range(0, executorCount).toArray()) { + SingleThreadExecutor executor = executors[currentExecutor]; + int coreCountStart = currentExecutor * loopCountPerExecutor; + submissionFutures[currentExecutor] = + runAsync( + () -> { + FormulaRequestContext requestContext = + new FormulaRequestContext(100, 20, 5, "formulaTest"); + for (int currentLoopCount : + range(coreCountStart, coreCountStart + loopCountPerExecutor).toArray()) { + long iterStartTime = System.nanoTime(); + try (KrystexVajramExecutor krystexVajramExecutor = + graph.createExecutor( + KrystexVajramExecutorConfig.builder() + .kryonExecutorConfigBuilder( + KryonExecutorConfig.builder().singleThreadExecutor(executor)) + .requestId("formulaTest") + .build())) { + timeToCreateExecutors.add(System.nanoTime() - iterStartTime); + metrics[currentLoopCount] = + ((KryonExecutor) krystexVajramExecutor.getKrystalExecutor()) + .getKryonMetrics(); + long enqueueStart = System.nanoTime(); + futures[currentLoopCount] = + executeVajram( + graph, krystexVajramExecutor, currentLoopCount, requestContext); + timeToEnqueueVajram.add(System.nanoTime() - enqueueStart); + } + } + }, + executor); } + allOf(submissionFutures).join(); allOf(futures).join(); long vajramTimeNs = System.nanoTime() - startTime; assertThat( @@ -201,51 +297,19 @@ void millionExecutors_oneCallEach_singleCore_benchmark() throws Exception { .succeedsWithin(ofSeconds(1)); assertThat(Adder.CALL_COUNTER.sum()).isEqualTo(loopCount); - /* - * Old Benchmark results (Unable to reproduce :( ) : - * loopCount = 1_000_000 - * maxParallelismPerCore = 10 - * Processor: 2.6 GHz 6-Core Intel Core i7 (with hyperthreading - 12 virtual cores) - * Benchmark result: - * platform overhead over reactive code = ~13 µs (13,000 ns) per request - * maxPoolSize = 120 - * maxActiveLeasesPerObject: 170 - * peakAvgActiveLeasesPerObject: 122 - * Avg. time to Enqueue vajrams : 8,486 ns - * Avg. time to execute vajrams : 14,965 ns - * Throughput executions/sec: 71000 - */ - /* - * Processor: Apple M1 Pro - * - * Benchmark results; - * Total java method time: 6,814,167 - * Total java futures time: 80,636,209 - * Outer Loop Count: 1,000,000 - * Inner Loop Count: 1 - * Avg. time to Create Executors:444 ns - * Avg. time to Enqueue vajrams:1,147 ns - * Avg. time to execute vajrams:35,673 ns - * Throughput executions/s: 28031 - * CommandsQueuedCount: 3,000,000 - * CommandQueueBypassedCount: 8,000,000 - * Platform overhead over native code: 35,667 ns per request - * Platform overhead over reactive code: 35,593 ns per request - * maxActiveLeasesPerObject: 1, peakAvgActiveLeasesPerObject: 1.0, maxPoolSize: 1 - */ printStats( loopCount, 1, javaNativeTimeNs, javaFuturesTimeNs, metrics, - timeToCreateExecutors, - timeToEnqueueVajram, + timeToCreateExecutors.sum(), + timeToEnqueueVajram.sum(), vajramTimeNs, EXEC_POOL); } - @Disabled("Long running benchmark (~16s)") + @Disabled("Long running benchmark (~9s)") @Test void thousandExecutors_1000CallsEach_singleCore_benchmark() throws Exception { SingleThreadExecutor executor = getExecutors(1)[0]; @@ -498,10 +562,10 @@ void formula_failure() { .withMessage("java.lang.ArithmeticException: / by zero"); } - private SingleThreadExecutor[] getExecutors(int count) throws LeaseUnavailableException { + private SingleThreadExecutor[] getExecutors(int count) { SingleThreadExecutor[] singleThreadedExecutors = new SingleThreadExecutor[count]; for (int i = 0; i < count; i++) { - singleThreadedExecutors[i] = EXEC_POOL.lease().get(); + singleThreadedExecutors[i] = EXECUTOR_LEASES[i].get(); } return singleThreadedExecutors; diff --git a/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/adder/SplitAdderTest.java b/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/adder/SplitAdderTest.java index 8e2ae3f5..8698c17e 100644 --- a/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/adder/SplitAdderTest.java +++ b/vajram/vajram-samples/src/test/java/com/flipkart/krystal/vajram/samples/calculator/adder/SplitAdderTest.java @@ -152,6 +152,7 @@ void vajram_benchmark() throws Exception { .requestId("splitAdderTest") .kryonExecutorConfigBuilder( KryonExecutorConfig.builder() + .singleThreadExecutor(executorLease.get()) .disabledDependantChains(disabledDepChains(graph))) .build())) { metrics[value] = @@ -223,6 +224,7 @@ void vajram_benchmark_2() throws Exception { .requestId("splitAdderTest") .kryonExecutorConfigBuilder( KryonExecutorConfig.builder() + .singleThreadExecutor(executorLease.get()) .disabledDependantChains(disabledDepChains(graph))) .build())) { timeToCreateExecutors += System.nanoTime() - iterStartTime;