Skip to content

Commit

Permalink
#32 rename labels and check indicator in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
vkuznetsov committed Jan 26, 2020
1 parent 7ece2f7 commit 361a4bc
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 57 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,21 @@ package ru.fix.completable.reactor.runtime
import ru.fix.aggregating.profiler.Identity

object Metrics {
const val PENDING_REQUEST = "pendingRequest"
const val PENDING_REQUEST = "pending_request"

const val REACTOR_EXECUTION = "completable_reactor"

const val REACTOR_VERTEX_EXECUTION = "completable_reactor_vertices"
}

object Tags {
const val MERGE = "merger"
const val MERGE = "merge"

const val HANDLE = "handler"
const val HANDLE = "handle"

const val PAYLOAD = "payload_execution"
const val REACHED_TERMINAL_STEP = "payload_reached_terminal_step"

const val EXECUTION = "execution"
const val FULLY_COMPLETED = "execution_graph_fully_completed"
}

object ProfilerIdentity {
Expand Down Expand Up @@ -45,20 +45,19 @@ object ProfilerIdentity {
fun payloadIdentity(payload: String?) =
Identity(
Metrics.REACTOR_EXECUTION,
tags(payload, Tags.PAYLOAD)
tags(payload, Tags.REACHED_TERMINAL_STEP)
)

@JvmStatic
fun executionIdentity(payload: String?): Identity {
return Identity(
Metrics.REACTOR_EXECUTION,
tags(payload, Tags.EXECUTION)
)
}
fun executionIdentity(payload: String?) =
Identity(
Metrics.REACTOR_EXECUTION,
tags(payload, Tags.FULLY_COMPLETED)
)

private fun tags(payload: String?, operation: String) =
mapOf(
"payload" to payload,
"operation" to operation
"execution_cycle" to operation
)
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package ru.fix.completable.reactor.runtime.execution

import mu.KotlinLogging
import ru.fix.aggregating.profiler.Identity
import ru.fix.completable.reactor.graph.runtime.RuntimeVertex
import ru.fix.completable.reactor.runtime.ProfilerIdentity
import ru.fix.completable.reactor.runtime.ProfilerNames
import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.Companion.INVALID_TRANSITION_PAYLOAD_CONTEXT
import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.HandlePayloadContext
import java.util.*
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package ru.fix.completable.reactor.runtime.execution

import mu.KotlinLogging
import ru.fix.aggregating.profiler.Identity
import ru.fix.completable.reactor.graph.runtime.RuntimeTransition
import ru.fix.completable.reactor.graph.runtime.RuntimeVertex
import ru.fix.completable.reactor.runtime.ProfilerIdentity
import ru.fix.completable.reactor.runtime.ProfilerNames
import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.Companion.INVALID_HANDLE_PAYLOAD_CONTEXT
import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.Companion.INVALID_MERGE_PAYLOAD_CONTEXT
import ru.fix.completable.reactor.runtime.execution.ExecutionBuilder.HandlePayloadContext
Expand Down Expand Up @@ -266,20 +264,6 @@ class MergeByExecutionBuilder<PayloadType>(
}
}


// object ProfiledIdentity {
// fun mergeIdentity(payload: String?, vertexName: String?): Identity {
// return Identity(
// "completable_reactor",
// mapOf(
// "payload" to payload,
// "operation" to ProfilerNames.MERGE,
// "vertex" to vertexName
// )
// )
// }
// }

/**
* @param pvx
* @param handlingResult empty in case of detached merge point
Expand All @@ -295,7 +279,6 @@ class MergeByExecutionBuilder<PayloadType>(

try {


val mergeCall = builder.profiler
.profiledCall(ProfilerIdentity.mergeIdentity(payload?.javaClass?.name, pvx.vertex.name))
.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.*;

Expand All @@ -43,9 +43,6 @@ public TracablePayload setNumber(int number) {
}
}

enum Status {OK}


static class SimpleGraph extends Graph<TracablePayload> {

Vertex processor1 =
Expand Down Expand Up @@ -84,24 +81,27 @@ public void trace_payload_if_payload_contain_special_id() throws Exception {

Profiler profiler = mock(Profiler.class);
ProfiledCall profiledCall = mock(ProfiledCall.class);

doNothing().when(profiler).attachIndicator(eq(Metrics.PENDING_REQUEST), any());

when(profiler.profiledCall(ArgumentMatchers.<String>any())).thenReturn(profiledCall);
when(profiler.profiledCall(ArgumentMatchers.<Identity>any())).thenReturn(profiledCall);

when(profiledCall.start()).thenReturn(profiledCall);

final CompletableReactor completableReactor = new CompletableReactor(profiler);


CompletableReactor completableReactor = new CompletableReactor(profiler);
SimpleGraph graph = new SimpleGraph();
completableReactor.registerGraph(graph);

for (int num = 0; num < 10; num++) {
completableReactor.submit(new TracablePayload().setNumber(num));
}
completableReactor.close();

ArgumentCaptor<Identity> identityCapture = ArgumentCaptor.forClass(Identity.class);


verify(profiler, times(1)).attachIndicator(eq(Metrics.PENDING_REQUEST), any());

ArgumentCaptor<Identity> identityCapture = ArgumentCaptor.forClass(Identity.class);
verify(profiler, times(80)).profiledCall(identityCapture.capture());

List<Identity> identities = identityCapture.getAllValues();
Expand All @@ -114,8 +114,8 @@ public void trace_payload_if_payload_contain_special_id() throws Exception {
expected.addAll(vertexIdentities(Tags.MERGE, "processor1", 10));
expected.addAll(vertexIdentities(Tags.MERGE,"processor2", 10));
expected.addAll(vertexIdentities(Tags.MERGE,"processor3", 10));
expected.addAll(executionIdentity(Tags.PAYLOAD, 10));
expected.addAll(executionIdentity(Tags.EXECUTION, 10));
expected.addAll(executionIdentity(Tags.REACHED_TERMINAL_STEP, 10));
expected.addAll(executionIdentity(Tags.FULLY_COMPLETED, 10));

assertThat(identities, containsInAnyOrder(expected.toArray()));
}
Expand All @@ -137,7 +137,7 @@ private static List<Identity> executionIdentity(String operation, int number) {
.mapToObj(i -> {
Map<String, String> tags = new HashMap<>();
tags.put("payload", TracablePayload.class.getName());
tags.put("operation", operation);
tags.put("execution_cycle", operation);
return new Identity(Metrics.REACTOR_EXECUTION, tags);
})
.collect(Collectors.toList());
Expand Down

0 comments on commit 361a4bc

Please sign in to comment.