diff --git a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java index 0ff235cd..678df310 100644 --- a/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java +++ b/module/src/main/java/zipkin2/module/storage/kafka/ZipkinKafkaStorageProperties.java @@ -15,15 +15,24 @@ import java.io.Serializable; import java.time.Duration; +import java.util.Collection; import java.util.LinkedHashMap; import java.util.Map; +import java.util.function.Predicate; import org.springframework.boot.context.properties.ConfigurationProperties; +import zipkin2.Span; import zipkin2.storage.kafka.KafkaStorage; import zipkin2.storage.kafka.KafkaStorageBuilder; import zipkin2.storage.kafka.KafkaStorageBuilder.DependencyStorageBuilder; import zipkin2.storage.kafka.KafkaStorageBuilder.SpanAggregationBuilder; import zipkin2.storage.kafka.KafkaStorageBuilder.SpanPartitioningBuilder; import zipkin2.storage.kafka.KafkaStorageBuilder.TraceStorageBuilder; +import zipkin2.storage.kafka.streams.samplers.CompositeTracePredicate; +import zipkin2.storage.kafka.streams.samplers.ErrorTracePredicate; +import zipkin2.storage.kafka.streams.samplers.SlowTracePredicate; +import zipkin2.storage.kafka.streams.samplers.TraceSamplingPredicate; + +import static java.util.Arrays.asList; @ConfigurationProperties("zipkin.storage.kafka") public class ZipkinKafkaStorageProperties implements Serializable { @@ -38,11 +47,13 @@ public class ZipkinKafkaStorageProperties implements Serializable { private SpanAggregationProperties spanAggregation = new SpanAggregationProperties(); private TraceStorageProperties traceStorage = new TraceStorageProperties(); private DependencyStorageProperties dependencyStorage = new DependencyStorageProperties(); + private TraceSamplingProperties traceSampling = new TraceSamplingProperties(); KafkaStorageBuilder toBuilder() { KafkaStorageBuilder builder = KafkaStorage.newBuilder(); builder.spanPartitioningBuilder(spanPartitioning.toBuilder()); - builder.spanAggregationBuilder(spanAggregation.toBuilder()); + builder.spanAggregationBuilder( + spanAggregation.toBuilder().tracePredicate(traceSampling.tracePredicate())); builder.traceStorageBuilder(traceStorage.toBuilder()); builder.dependencyStorageBuilder(dependencyStorage.toBuilder()); if (hostname != null) builder.hostname(hostname); @@ -226,6 +237,35 @@ SpanAggregationBuilder toBuilder() { } } + static class TraceSamplingProperties { + private long maxResponseTime = 1_000_000L; + private float sampleRate = 0.1f; + private boolean enabled = false; + + public void setMaxResponseTime(long maxResponseTime) { + this.maxResponseTime = maxResponseTime; + } + + public void setSampleRate(float sampleRate) { + this.sampleRate = sampleRate; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + Predicate> tracePredicate() { + if (enabled) { + return new CompositeTracePredicate(asList( + new ErrorTracePredicate(), + new SlowTracePredicate(maxResponseTime), + new TraceSamplingPredicate(sampleRate)) + ); + } + return spans -> true; + } + } + static class TraceStorageProperties { private Boolean enabled; private String spansTopic; diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java index 02c3ca4e..88d33500 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorage.java @@ -123,7 +123,8 @@ public static KafkaStorageBuilder newBuilder() { builder.spanAggregation.traceTopic, builder.spanAggregation.dependencyTopic, builder.spanAggregation.traceTimeout, - builder.spanAggregation.enabled).get(); + builder.spanAggregation.enabled, + builder.spanAggregation.tracePredicate).get(); traceStoreTopology = new TraceStorageTopology( builder.traceStorage.spansTopic, autocompleteKeys, diff --git a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java index 50372f62..b1659a02 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java +++ b/storage/src/main/java/zipkin2/storage/kafka/KafkaStorageBuilder.java @@ -15,10 +15,12 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.function.BiFunction; +import java.util.function.Predicate; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -26,6 +28,7 @@ import org.apache.kafka.common.serialization.Serdes.StringSerde; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.streams.StreamsConfig; +import zipkin2.Span; import zipkin2.storage.StorageComponent; import static zipkin2.storage.kafka.KafkaStorage.HTTP_PATH_PREFIX; @@ -230,6 +233,7 @@ public final SpanPartitioningBuilder overrides(Map overrides) { } public static class SpanAggregationBuilder { + Predicate> tracePredicate = spans -> true; boolean enabled = true; Duration traceTimeout = Duration.ofMinutes(1); String spansTopic = "zipkin-spans"; @@ -246,6 +250,14 @@ public SpanAggregationBuilder() { streamConfig.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE); } + /** + * Filter traces based on sampling rules. + */ + public SpanAggregationBuilder tracePredicate(Predicate> tracePredicate) { + this.tracePredicate = tracePredicate; + return this; + } + /** * Enable aggregation stream application to run. When disabled spans will not be consumed to * produce traces and dependencies. diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java b/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java index 7cafa94b..e60738eb 100644 --- a/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/SpanAggregationTopology.java @@ -15,7 +15,9 @@ import java.time.Duration; import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.function.Predicate; import java.util.function.Supplier; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.StreamsBuilder; @@ -54,6 +56,7 @@ public final class SpanAggregationTopology implements Supplier { // SerDes final SpansSerde spansSerde; final DependencyLinkSerde dependencyLinkSerde; + final Predicate> tracePredicate; public SpanAggregationTopology( String spansTopic, @@ -62,11 +65,17 @@ public SpanAggregationTopology( Duration traceTimeout, boolean aggregationEnabled ) { + this(spansTopic, traceTopic, dependencyTopic, traceTimeout, aggregationEnabled, spans -> true); + } + + public SpanAggregationTopology(String spansTopic, String traceTopic, String dependencyTopic, + Duration traceTimeout, boolean aggregationEnabled, Predicate> tracePredicate) { this.spansTopic = spansTopic; this.traceTopic = traceTopic; this.dependencyTopic = dependencyTopic; this.traceTimeout = traceTimeout; this.aggregationEnabled = aggregationEnabled; + this.tracePredicate = tracePredicate; spansSerde = new SpansSerde(); dependencyLinkSerde = new DependencyLinkSerde(); } @@ -92,6 +101,7 @@ public SpanAggregationTopology( // hold until a new record tells that a window is closed and we can process it further .suppress(untilWindowCloses(unbounded())) .toStream() + .filter((traceId, spans) -> tracePredicate.test(spans)) .selectKey((windowed, spans) -> windowed.key()); // Downstream to traces topic tracesStream.to(traceTopic, Produced.with(Serdes.String(), spansSerde)); diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/CompositeTracePredicate.java b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/CompositeTracePredicate.java new file mode 100644 index 00000000..c8538ce7 --- /dev/null +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/CompositeTracePredicate.java @@ -0,0 +1,37 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import java.util.Collection; +import java.util.function.Predicate; +import zipkin2.Span; + +public class CompositeTracePredicate implements Predicate> { + private final Collection>> predicates; + + public CompositeTracePredicate(Collection>> predicates) { + this.predicates = predicates; + } + + @Override public boolean test(Collection spans) { + for (Predicate> predicate : predicates) { + if (predicate.test(spans)) { + return true; + } + } + return false; + } +} diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/ErrorTracePredicate.java b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/ErrorTracePredicate.java new file mode 100644 index 00000000..386550fb --- /dev/null +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/ErrorTracePredicate.java @@ -0,0 +1,26 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import java.util.Collection; +import java.util.function.Predicate; +import zipkin2.Span; + +public class ErrorTracePredicate implements Predicate> { + @Override public boolean test(Collection spans) { + return spans.stream().anyMatch(s -> s.tags().containsKey("error")); + } +} diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/SlowTracePredicate.java b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/SlowTracePredicate.java new file mode 100644 index 00000000..5944f901 --- /dev/null +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/SlowTracePredicate.java @@ -0,0 +1,33 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import java.util.Collection; +import java.util.function.Predicate; +import zipkin2.Span; + +public class SlowTracePredicate implements Predicate> { + private final long maxResponseTime; + + public SlowTracePredicate(long maxResponseTime) { + this.maxResponseTime = maxResponseTime; + } + + @Override public boolean test(Collection spans) { + return spans.stream() + .anyMatch(span -> span.durationAsLong() >= maxResponseTime); + } +} diff --git a/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/TraceSamplingPredicate.java b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/TraceSamplingPredicate.java new file mode 100644 index 00000000..ba1dcaed --- /dev/null +++ b/storage/src/main/java/zipkin2/storage/kafka/streams/samplers/TraceSamplingPredicate.java @@ -0,0 +1,44 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import java.util.Collection; +import java.util.function.Predicate; +import zipkin2.Span; +import zipkin2.internal.HexCodec; + +public class TraceSamplingPredicate implements Predicate> { + private final float rate; + + public TraceSamplingPredicate(float rate) { + if (rate < 0.f || rate > 1.f) + throw new IllegalArgumentException("rate should be between 0 and 1: was " + rate); + + this.rate = rate; + } + + @Override public boolean test(Collection spans) { + return spans.stream() + .anyMatch(span -> { + if (Boolean.TRUE.equals(span.debug())) return true; + long traceId = HexCodec.lowerHexToUnsignedLong(span.traceId()); + // The absolute value of Long.MIN_VALUE is larger than a long, so Math.abs returns identity. + // This converts to MAX_VALUE to avoid always dropping when traceId == Long.MIN_VALUE + long t = traceId == Long.MIN_VALUE ? Long.MAX_VALUE : Math.abs(traceId); + return t <= (long) (Long.MAX_VALUE * rate); + }); + } +} diff --git a/storage/src/test/java/zipkin2/storage/kafka/streams/SpanAggregationTopologyTest.java b/storage/src/test/java/zipkin2/storage/kafka/streams/SpanAggregationTopologyTest.java index 99b9fb31..ae59377d 100644 --- a/storage/src/test/java/zipkin2/storage/kafka/streams/SpanAggregationTopologyTest.java +++ b/storage/src/test/java/zipkin2/storage/kafka/streams/SpanAggregationTopologyTest.java @@ -15,9 +15,12 @@ import java.time.Duration; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Properties; +import java.util.function.BiConsumer; +import java.util.function.Predicate; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; @@ -27,6 +30,7 @@ import org.apache.kafka.streams.TopologyTestDriver; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; +import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import zipkin2.DependencyLink; import zipkin2.Endpoint; @@ -43,6 +47,18 @@ class SpanAggregationTopologyTest { Properties props = new Properties(); + private final SpansSerde spansSerde = new SpansSerde(); + private final DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde(); + private TopologyTestDriver testDriver; + + @AfterEach + void tearDown() { + //Finally close resources + testDriver.close(); + spansSerde.close(); + dependencyLinkSerde.close(); + } + SpanAggregationTopologyTest() { props.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); @@ -59,27 +75,65 @@ class SpanAggregationTopologyTest { TopologyDescription description = topology.describe(); // Then: single threaded topology assertThat(description.subtopologies()).hasSize(0); - TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); - testDriver.close(); + testDriver = new TopologyTestDriver(topology, props); } @Test void should_aggregateSpans_and_mapDependencies() { + // accepts all traces + final Predicate> tracePredicate = spans -> true; + + assertTrace(tracePredicate, (a, b) -> { + // Then: a trace is aggregated.1 + ProducerRecord> trace = + testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer()); + assertThat(trace).isNotNull(); + OutputVerifier.compareKeyValue(trace, a.traceId(), Arrays.asList(a, b)); + // Then: a dependency link is created + ProducerRecord linkRecord = + testDriver.readOutput(dependencyTopic, new StringDeserializer(), + dependencyLinkSerde.deserializer()); + assertThat(linkRecord).isNotNull(); + DependencyLink link = DependencyLink.newBuilder() + .parent("svc_a").child("svc_b").callCount(1).errorCount(0) + .build(); + OutputVerifier.compareKeyValue(linkRecord, "svc_a:svc_b", link); + }); + } + + @Test void should_discardTrace_ifSpecifiedPredicateFails() { + // discard all traces + final Predicate> tracePredicate = spans -> false; + + assertTrace(tracePredicate, (a, b) -> { + // Then: a trace is aggregated.1 + ProducerRecord> trace = + testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer()); + assertThat(trace).isNull(); + // Then: a dependency link is created + ProducerRecord linkRecord = + testDriver.readOutput(dependencyTopic, new StringDeserializer(), + dependencyLinkSerde.deserializer()); + assertThat(linkRecord).isNull(); + }); + } + + private void assertTrace(Predicate> tracePredicate, + BiConsumer spanAssertion) { // Given: configuration Duration traceTimeout = Duration.ofSeconds(1); - SpansSerde spansSerde = new SpansSerde(); - DependencyLinkSerde dependencyLinkSerde = new DependencyLinkSerde(); // When: topology built Topology topology = new SpanAggregationTopology( spansTopic, traceTopic, dependencyTopic, traceTimeout, - true).get(); + true, + tracePredicate).get(); TopologyDescription description = topology.describe(); // Then: single threaded topology assertThat(description.subtopologies()).hasSize(1); // Given: test driver - TopologyTestDriver testDriver = new TopologyTestDriver(topology, props); + testDriver = new TopologyTestDriver(topology, props); // When: two related spans coming on the same Session window ConsumerRecordFactory> factory = new ConsumerRecordFactory<>(spansTopic, new StringSerializer(), spansSerde.serializer()); @@ -97,23 +151,6 @@ class SpanAggregationTopologyTest { Span c = Span.newBuilder().traceId("c").id("c").build(); testDriver.pipeInput(factory.create(spansTopic, c.traceId(), Collections.singletonList(c), traceTimeout.toMillis() + 1)); - // Then: a trace is aggregated.1 - ProducerRecord> trace = - testDriver.readOutput(traceTopic, new StringDeserializer(), spansSerde.deserializer()); - assertThat(trace).isNotNull(); - OutputVerifier.compareKeyValue(trace, a.traceId(), Arrays.asList(a, b)); - // Then: a dependency link is created - ProducerRecord linkRecord = - testDriver.readOutput(dependencyTopic, new StringDeserializer(), - dependencyLinkSerde.deserializer()); - assertThat(linkRecord).isNotNull(); - DependencyLink link = DependencyLink.newBuilder() - .parent("svc_a").child("svc_b").callCount(1).errorCount(0) - .build(); - OutputVerifier.compareKeyValue(linkRecord, "svc_a:svc_b", link); - //Finally close resources - testDriver.close(); - spansSerde.close(); - dependencyLinkSerde.close(); + spanAssertion.accept(a, b); } } diff --git a/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/CompositeTracePredicateTest.java b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/CompositeTracePredicateTest.java new file mode 100644 index 00000000..93af1dfb --- /dev/null +++ b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/CompositeTracePredicateTest.java @@ -0,0 +1,45 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +public class CompositeTracePredicateTest extends TracePredicateTestBase { + @Test + public void acceptIfAnyTestPassed() { + final CompositeTracePredicate predicate = new CompositeTracePredicate( + asList(spans -> false, spans -> true) + ); + + final boolean sampled = predicate.test(asList(errorSpanOfTrace1, normalSpanOfTrace1)); + + assertThat(sampled).isTrue(); + } + + @Test + public void discardIfAllTestFailed() { + final CompositeTracePredicate predicate = new CompositeTracePredicate( + asList(spans -> false, spans -> false) + ); + + final boolean sampled = predicate.test(asList(errorSpanOfTrace1, normalSpanOfTrace1)); + + assertThat(sampled).isFalse(); + } +} diff --git a/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/ErrorTracePredicateTest.java b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/ErrorTracePredicateTest.java new file mode 100644 index 00000000..450f7176 --- /dev/null +++ b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/ErrorTracePredicateTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +public class ErrorTracePredicateTest extends TracePredicateTestBase { + + private final ErrorTracePredicate predicate = new ErrorTracePredicate(); + + @Test + public void acceptTraceId() { + final boolean sampled = predicate.test(asList(errorSpanOfTrace1, normalSpanOfTrace1)); + + assertThat(sampled).isTrue(); + } + + @Test + public void discardTraceId() { + final boolean sampled = predicate.test(asList(slowSpan("1"), normalSpanOfTrace1)); + + assertThat(sampled).isFalse(); + } +} diff --git a/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/SlowTracePredicateTest.java b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/SlowTracePredicateTest.java new file mode 100644 index 00000000..bf7a10b5 --- /dev/null +++ b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/SlowTracePredicateTest.java @@ -0,0 +1,40 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +public class SlowTracePredicateTest extends TracePredicateTestBase { + + private final SlowTracePredicate predicate = new SlowTracePredicate(1_000_000L); + + @Test + public void acceptTraceId() { + final boolean sampled = predicate.test(asList(slowSpan("1"), normalSpanOfTrace1)); + + assertThat(sampled).isTrue(); + } + + @Test + public void discardTraceId() { + final boolean sampled = predicate.test(asList(errorSpanOfTrace1, normalSpanOfTrace1)); + + assertThat(sampled).isFalse(); + } +} diff --git a/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/TracePredicateTestBase.java b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/TracePredicateTestBase.java new file mode 100644 index 00000000..a1baf1d6 --- /dev/null +++ b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/TracePredicateTestBase.java @@ -0,0 +1,50 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import org.junit.Ignore; +import zipkin2.Span; + +@Ignore +class TracePredicateTestBase { + final Span errorSpanOfTrace1 = errorSpan("1"); + final Span normalSpanOfTrace1 = normalSpan("1"); + + private Span errorSpan(String traceId) { + return Span.newBuilder() + .traceId(traceId).id("1").name("get").duration(0L) + .putTag("error", "500") + .build(); + } + + private Span normalSpan(String traceId) { + return Span.newBuilder() + .traceId(traceId).id("2").name("get").duration(0L) + .build(); + } + + Span debugSpan(String traceId) { + return Span.newBuilder() + .traceId(traceId).id("3").name("get").duration(0L).debug(true) + .build(); + } + + Span slowSpan(String traceId) { + return Span.newBuilder() + .traceId(traceId).id("4").name("get").duration(10_000_000L) + .build(); + } +} diff --git a/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/TraceSamplingPredicateTest.java b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/TraceSamplingPredicateTest.java new file mode 100644 index 00000000..25339ee6 --- /dev/null +++ b/storage/src/test/java/zipkin2/storage/kafka/streams/samplers/TraceSamplingPredicateTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2019 The OpenZipkin Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except + * in compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + * + */ + +package zipkin2.storage.kafka.streams.samplers; + +import org.junit.Test; + +import static java.util.Arrays.asList; +import static org.assertj.core.api.Assertions.assertThat; + +public class TraceSamplingPredicateTest extends TracePredicateTestBase { + + @Test + public void acceptTraceId() { + final TraceSamplingPredicate predicate = new TraceSamplingPredicate(1.f); + + final boolean sampled = predicate.test(asList(errorSpanOfTrace1, normalSpanOfTrace1)); + + assertThat(sampled).isTrue(); + } + + @Test + public void acceptDebugTrace() { + final TraceSamplingPredicate predicate = new TraceSamplingPredicate(0.f); + + final boolean sampled = predicate.test(asList(debugSpan("1"), normalSpanOfTrace1)); + + assertThat(sampled).isTrue(); + } + + @Test + public void discardTraceId() { + final TraceSamplingPredicate predicate = new TraceSamplingPredicate(0.f); + + final boolean sampled = predicate.test(asList(errorSpanOfTrace1, normalSpanOfTrace1)); + + assertThat(sampled).isFalse(); + } + + @Test(expected = IllegalArgumentException.class) + public void throwsOnInvalidRate() { + new TraceSamplingPredicate(10.f); + } + + @Test(expected = IllegalArgumentException.class) + public void throwsOnNegativeRate() { + new TraceSamplingPredicate(-10.f); + } +}