diff --git a/completable-reactor-runtime/src/main/java/ru/fix/completable/reactor/runtime/ProfilerNames.java b/completable-reactor-runtime/src/main/java/ru/fix/completable/reactor/runtime/ProfilerNames.java deleted file mode 100644 index cc7dcd25..00000000 --- a/completable-reactor-runtime/src/main/java/ru/fix/completable/reactor/runtime/ProfilerNames.java +++ /dev/null @@ -1,13 +0,0 @@ -package ru.fix.completable.reactor.runtime; - -/** - * Created by Kamil Asfandiyarov - */ -public final class ProfilerNames { - public static final String PROFILER = CompletableReactor.class.getPackage().getName(); - public static final String PAYLOAD = "pld"; - public static final String EXECUTION = "exc"; - public static final String HANDLE = "hndl"; - public static final String MERGE = "mrg"; - public static final String PENDING_REQUEST = "pendingRequest"; -} diff --git a/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/ProfilerMetrics.kt b/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/ProfilerMetrics.kt index 1deaaea1..d8eff183 100644 --- a/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/ProfilerMetrics.kt +++ b/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/ProfilerMetrics.kt @@ -3,7 +3,7 @@ package ru.fix.completable.reactor.runtime import ru.fix.aggregating.profiler.Identity object Metrics { - const val PENDING_REQUEST = "pendingRequest" + const val PENDING_REQUEST = "pending_request" const val REACTOR_EXECUTION = "completable_reactor" @@ -11,13 +11,13 @@ object Metrics { } object Tags { - const val MERGE = "merger" + const val MERGE = "merge" - const val HANDLE = "handler" + const val HANDLE = "handle" - const val PAYLOAD = "payload_execution" + const val REACHED_TERMINAL_STEP = "payload_reached_terminal_step" - const val EXECUTION = "execution" + const val FULLY_COMPLETED = "execution_graph_fully_completed" } object ProfilerIdentity { @@ -45,20 +45,19 @@ object ProfilerIdentity { fun payloadIdentity(payload: String?) = Identity( Metrics.REACTOR_EXECUTION, - tags(payload, Tags.PAYLOAD) + tags(payload, Tags.REACHED_TERMINAL_STEP) ) @JvmStatic - fun executionIdentity(payload: String?): Identity { - return Identity( - Metrics.REACTOR_EXECUTION, - tags(payload, Tags.EXECUTION) - ) - } + fun executionIdentity(payload: String?) = + Identity( + Metrics.REACTOR_EXECUTION, + tags(payload, Tags.FULLY_COMPLETED) + ) private fun tags(payload: String?, operation: String) = mapOf( "payload" to payload, - "operation" to operation + "execution_cycle" to operation ) } \ No newline at end of file diff --git a/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/HandleByExecutionBuilder.kt b/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/HandleByExecutionBuilder.kt index 23ce2604..fb9a9ef7 100644 --- a/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/HandleByExecutionBuilder.kt +++ b/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/HandleByExecutionBuilder.kt @@ -1,10 +1,8 @@ package ru.fix.completable.reactor.runtime.execution import mu.KotlinLogging -import ru.fix.aggregating.profiler.Identity import ru.fix.completable.reactor.graph.runtime.RuntimeVertex import ru.fix.completable.reactor.runtime.ProfilerIdentity -import ru.fix.completable.reactor.runtime.ProfilerNames import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.Companion.INVALID_TRANSITION_PAYLOAD_CONTEXT import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.HandlePayloadContext import java.util.* diff --git a/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/MergeByExecutionBuilder.kt b/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/MergeByExecutionBuilder.kt index ff413a0e..f77f5664 100644 --- a/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/MergeByExecutionBuilder.kt +++ b/completable-reactor-runtime/src/main/kotlin/ru/fix/completable/reactor/runtime/execution/MergeByExecutionBuilder.kt @@ -1,11 +1,9 @@ package ru.fix.completable.reactor.runtime.execution import mu.KotlinLogging -import ru.fix.aggregating.profiler.Identity import ru.fix.completable.reactor.graph.runtime.RuntimeTransition import ru.fix.completable.reactor.graph.runtime.RuntimeVertex import ru.fix.completable.reactor.runtime.ProfilerIdentity -import ru.fix.completable.reactor.runtime.ProfilerNames import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.Companion.INVALID_HANDLE_PAYLOAD_CONTEXT import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.Companion.INVALID_MERGE_PAYLOAD_CONTEXT import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.HandlePayloadContext @@ -266,20 +264,6 @@ class MergeByExecutionBuilder( } } - -// object ProfiledIdentity { -// fun mergeIdentity(payload: String?, vertexName: String?): Identity { -// return Identity( -// "completable_reactor", -// mapOf( -// "payload" to payload, -// "operation" to ProfilerNames.MERGE, -// "vertex" to vertexName -// ) -// ) -// } -// } - /** * @param pvx * @param handlingResult empty in case of detached merge point @@ -295,7 +279,6 @@ class MergeByExecutionBuilder( try { - val mergeCall = builder.profiler .profiledCall(ProfilerIdentity.mergeIdentity(payload?.javaClass?.name, pvx.vertex.name)) .start() diff --git a/completable-reactor-runtime/src/test/java/ru/fix/completable/reactor/runtime/tests/ProfilingTest.java b/completable-reactor-runtime/src/test/java/ru/fix/completable/reactor/runtime/tests/ProfilingTest.java index 114f3400..eb4d31d2 100644 --- a/completable-reactor-runtime/src/test/java/ru/fix/completable/reactor/runtime/tests/ProfilingTest.java +++ b/completable-reactor-runtime/src/test/java/ru/fix/completable/reactor/runtime/tests/ProfilingTest.java @@ -22,7 +22,7 @@ import static java.util.concurrent.CompletableFuture.completedFuture; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; +import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.mockito.Mockito.*; @@ -43,9 +43,6 @@ public TracablePayload setNumber(int number) { } } - enum Status {OK} - - static class SimpleGraph extends Graph { Vertex processor1 = @@ -84,24 +81,27 @@ public void trace_payload_if_payload_contain_special_id() throws Exception { Profiler profiler = mock(Profiler.class); ProfiledCall profiledCall = mock(ProfiledCall.class); + + doNothing().when(profiler).attachIndicator(eq(Metrics.PENDING_REQUEST), any()); + when(profiler.profiledCall(ArgumentMatchers.any())).thenReturn(profiledCall); when(profiler.profiledCall(ArgumentMatchers.any())).thenReturn(profiledCall); when(profiledCall.start()).thenReturn(profiledCall); - final CompletableReactor completableReactor = new CompletableReactor(profiler); - - + CompletableReactor completableReactor = new CompletableReactor(profiler); SimpleGraph graph = new SimpleGraph(); completableReactor.registerGraph(graph); - for (int num = 0; num < 10; num++) { completableReactor.submit(new TracablePayload().setNumber(num)); } completableReactor.close(); - ArgumentCaptor identityCapture = ArgumentCaptor.forClass(Identity.class); + + verify(profiler, times(1)).attachIndicator(eq(Metrics.PENDING_REQUEST), any()); + + ArgumentCaptor identityCapture = ArgumentCaptor.forClass(Identity.class); verify(profiler, times(80)).profiledCall(identityCapture.capture()); List identities = identityCapture.getAllValues(); @@ -114,8 +114,8 @@ public void trace_payload_if_payload_contain_special_id() throws Exception { expected.addAll(vertexIdentities(Tags.MERGE, "processor1", 10)); expected.addAll(vertexIdentities(Tags.MERGE,"processor2", 10)); expected.addAll(vertexIdentities(Tags.MERGE,"processor3", 10)); - expected.addAll(executionIdentity(Tags.PAYLOAD, 10)); - expected.addAll(executionIdentity(Tags.EXECUTION, 10)); + expected.addAll(executionIdentity(Tags.REACHED_TERMINAL_STEP, 10)); + expected.addAll(executionIdentity(Tags.FULLY_COMPLETED, 10)); assertThat(identities, containsInAnyOrder(expected.toArray())); } @@ -137,7 +137,7 @@ private static List executionIdentity(String operation, int number) { .mapToObj(i -> { Map tags = new HashMap<>(); tags.put("payload", TracablePayload.class.getName()); - tags.put("operation", operation); + tags.put("execution_cycle", operation); return new Identity(Metrics.REACTOR_EXECUTION, tags); }) .collect(Collectors.toList());