Skip to content

Commit

Permalink
cassandra: cleans up code and tests (#3709)
Browse files Browse the repository at this point in the history
This cleans up deprecation and style issues found in IntelliJ analysis.
It also migrates docker tests to junit-jupiter and fixes the logging
configuration which broke when we updated to SLF4J 2.x.

This also updates all use of docker test images to the latest, in
attempts to share base image layers.

Signed-off-by: Adrian Cole <[email protected]>
  • Loading branch information
codefromthecrypt authored Jan 25, 2024
1 parent 29044e2 commit 4f263de
Show file tree
Hide file tree
Showing 31 changed files with 114 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class ServerIntegratedBenchmark {

@Test void elasticsearch() throws Exception {
GenericContainer<?> elasticsearch =
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-elasticsearch7:3.0.4"))
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-elasticsearch7:3.0.5"))
.withNetwork(Network.SHARED)
.withNetworkAliases("elasticsearch")
.withLabel("name", "elasticsearch")
Expand All @@ -105,7 +105,7 @@ class ServerIntegratedBenchmark {

@Test void cassandra3() throws Exception {
GenericContainer<?> cassandra =
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-cassandra:3.0.4"))
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-cassandra:3.0.5"))
.withNetwork(Network.SHARED)
.withNetworkAliases("cassandra")
.withLabel("name", "cassandra")
Expand All @@ -119,7 +119,7 @@ class ServerIntegratedBenchmark {

@Test void mysql() throws Exception {
GenericContainer<?> mysql =
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-mysql:3.0.4"))
new GenericContainer<>(parse("ghcr.io/openzipkin/zipkin-mysql:3.0.5"))
.withNetwork(Network.SHARED)
.withNetworkAliases("mysql")
.withLabel("name", "mysql")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ String brokerURL() {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class ActiveMQContainer extends GenericContainer<ActiveMQContainer> {
ActiveMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-activemq:3.0.4"));
super(parse("ghcr.io/openzipkin/zipkin-activemq:3.0.5"));
withExposedPorts(ACTIVEMQ_PORT);
waitStrategy = Wait.forListeningPorts(ACTIVEMQ_PORT);
withStartupTimeout(Duration.ofSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ KafkaCollector.Builder newCollectorBuilder(String topic, int streams) {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class KafkaContainer extends GenericContainer<KafkaContainer> {
KafkaContainer() {
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.4"));
super(parse("ghcr.io/openzipkin/zipkin-kafka:3.0.5"));
waitStrategy = Wait.forHealthcheck();
// 19092 is for connections from the Docker host and needs to be used as a fixed port.
// TODO: someone who knows Kafka well, make ^^ comment better!
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ int port() {
// mostly waiting for https://github.com/testcontainers/testcontainers-java/issues/3537
static final class RabbitMQContainer extends GenericContainer<RabbitMQContainer> {
RabbitMQContainer() {
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.4"));
super(parse("ghcr.io/openzipkin/zipkin-rabbitmq:3.0.5"));
withExposedPorts(RABBIT_PORT);
waitStrategy = Wait.forLogMessage(".*Server startup complete.*", 1);
withStartupTimeout(Duration.ofSeconds(60));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ static final class EurekaContainer extends GenericContainer<EurekaContainer> {
static final int EUREKA_PORT = 8761;

EurekaContainer(Map<String, String> env) {
super(parse("ghcr.io/openzipkin/zipkin-eureka:3.0.4"));
super(parse("ghcr.io/openzipkin/zipkin-eureka:3.0.5"));
withEnv(env);
withExposedPorts(EUREKA_PORT);
waitStrategy = Wait.forHealthcheck();
Expand Down
4 changes: 2 additions & 2 deletions zipkin-storage/cassandra/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ As you'll notice, the duration component is optional, and stored in
millisecond resolution as opposed to microsecond (which the query represents).
The final query shows that the input is rounded up to the nearest millisecond.

The reason we can query on `duration` is due to a SASI index. Eventhough the
The reason we can query on `duration` is due to a SASI index. Even though the
search granularity is millisecond, original duration data remains microsecond
granularity. Meanwhile, write performance is dramatically better than writing
discrete values, due to fewer distinct writes.
Expand All @@ -175,6 +175,6 @@ optimised for queries within a single day. The penalty of reading multiple days
otherwise overhead of reading a significantly larger amount of data.

### Benchmarking
Benchmarking the new datamodel demonstrates a significant performance improvement on reads. How much of this translates to the
Benchmarking the new data model demonstrates a significant performance improvement on reads. How much of this translates to the
Zipkin UI is hard to tell due to the complexity of CassandraSpanConsumer and how searches are possible. Benchmarking stress
profiles are found in traces-stress.yaml and trace_by_service_span-stress.yaml and span_by_service-stress.yaml.
12 changes: 11 additions & 1 deletion zipkin-storage/cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
<groupId>org.apache.cassandra</groupId>
<artifactId>java-driver-core</artifactId>
<version>${java-driver.version}</version>
<!-- Exclude unused graph and geo dependencies -->
<exclusions>
<!-- Exclude unused graph and geo dependencies -->
<exclusion>
<groupId>com.esri.geometry</groupId>
<artifactId>*</artifactId>
Expand All @@ -92,6 +92,16 @@
Mockito dies trying to mock CqlSession without it. -->
</exclusions>
</dependency>

<!-- Set SLF4J version to what's used by the server, without excluding the version
of cassandra, which is used in the zipkin-dependencies spark job. -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>

<!-- temporary until https://github.com/apache/cassandra-java-driver/pull/1904 -->
<dependency>
<groupId>com.github.jnr</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -13,8 +13,6 @@
*/
package zipkin2.storage.cassandra;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import zipkin2.Call;
import zipkin2.storage.AutocompleteTags;
Expand All @@ -28,7 +26,7 @@ class CassandraAutocompleteTags implements AutocompleteTags {
enabled = storage.searchEnabled
&& !storage.autocompleteKeys.isEmpty()
&& storage.metadata().hasAutocompleteTags;
keysCall = Call.create(Collections.unmodifiableList(new ArrayList<>(storage.autocompleteKeys)));
keysCall = Call.create(List.copyOf(storage.autocompleteKeys));
valuesCallFactory = enabled ? new SelectAutocompleteValues.Factory(storage.session()) : null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -36,8 +36,7 @@
import static zipkin2.storage.cassandra.Schema.TABLE_SERVICE_SPANS;

class CassandraSpanConsumer implements SpanConsumer { // not final for testing
final CqlSession session;
final boolean strictTraceId, searchEnabled;
final boolean searchEnabled;
final InsertSpan.Factory insertSpan;
final Set<String> autocompleteKeys;

Expand All @@ -62,12 +61,9 @@ void clear() {
);
}

// Exposed to allow tests to switch from strictTraceId to not
CassandraSpanConsumer(CqlSession session, Schema.Metadata metadata, boolean strictTraceId,
boolean searchEnabled, Set<String> autocompleteKeys, int autocompleteTtl,
int autocompleteCardinality) {
this.session = session;
this.strictTraceId = strictTraceId;
this.searchEnabled = searchEnabled;
this.autocompleteKeys = autocompleteKeys;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -166,7 +166,7 @@ static SelectTraceIdsFromSpan.Factory initialiseSelectTraceIdsFromSpan(CqlSessio
* Creates a call representing one or more queries against {@link Schema#TABLE_TRACE_BY_SERVICE_SPAN}
* and possibly {@link Schema#TABLE_TRACE_BY_SERVICE_REMOTE_SERVICE}.
*
* <p>The result will be an aggregate if the input requests's serviceName is null, both span name
* <p>The result will be an aggregate if the input request serviceName is null, both span name
* and remote service name are supplied, or there's more than one day of data in the timestamp
* range.
*
Expand Down Expand Up @@ -226,7 +226,7 @@ Call<Map<String, Long>> newBucketedTraceIdCall(
traceIndexFetchSize));
}

if ("".equals(serviceName)) {
if (serviceName.isEmpty()) {
// If we have no service name, we have to lookup service names before running trace ID queries
Call<List<String>> serviceNames = getServiceNames();
if (serviceRemoteServices.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ Schema.Metadata metadata() {
return ResultSetFutureCall.isOverCapacity(e);
}

@Override public final String toString() {
@Override public String toString() {
return "CassandraStorage{contactPoints=" + contactPoints + ", keyspace=" + keyspace + "}";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -60,7 +60,7 @@ public static int durationIndexBucket(long ts_micro) {
* <p>Values over {@link RecyclableBuffers#SHORT_STRING_LENGTH} are not considered. Zipkin's
* {@link QueryRequest#annotationQuery()} are equals match. Not all values are lookup values. For
* example, {@code sql.query} isn't something that is likely to be looked up by value and indexing
* that could add a potentially kilobyte partition key on {@link Schema#TABLE_SPAN}
* that could add a kilobyte partition key on {@link Schema#TABLE_SPAN}
*
* @see QueryRequest#annotationQuery()
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -129,7 +129,7 @@ Call<Void> create(Input span) {
* <p>If there's consistently 8 tombstones (nulls) per row, then we'll only need 125 spans in a
* trace (rows in a partition) to trigger the `tombstone_warn_threshold warnings being logged in
* the C* nodes. And if we go to 12500 spans in a trace then that whole trace partition would
* become unreadable. Cassandra warns at a 1000 tombstones in any query, and fails on 100000
* become unreadable. Cassandra warns at 1000 tombstones in any query, and fails on 100000
* tombstones.
*
* <p>There's also a small question about disk usage efficiency. Each tombstone is a cell name
Expand All @@ -143,8 +143,8 @@ Call<Void> create(Input span) {
* <p>Another popular practice is to insert those potentially null columns as separate statements
* (and optionally put them together into UNLOGGED batches). This works as multiple writes to the
* same partition has little overhead, and here we're not worried about lack of isolation between
* those writes, as the write is asynchronous anyway. An example of this approach is in the
* cassandra-reaper project here: https://github.com/thelastpickle/cassandra-reaper/blob/master/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java#L622-L642
* writes, as it is asynchronous anyway. An example of this approach is in the cassandra-reaper
* project here: https://github.com/thelastpickle/cassandra-reaper/blob/master/src/server/src/main/java/io/cassandrareaper/storage/CassandraStorage.java#L622-L642
*/
@Override protected CompletionStage<AsyncResultSet> newCompletionStage() {
BoundStatementBuilder bound = factory.preparedStatement.boundStatementBuilder()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -15,7 +15,6 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.ResultSet;
import zipkin2.storage.cassandra.CassandraStorage.SessionFactory;

import static zipkin2.storage.cassandra.Schema.TABLE_SPAN;
Expand Down Expand Up @@ -51,9 +50,9 @@ Schema.Metadata metadata() {
return metadata;
}

ResultSet healthCheck() {
void healthCheck() {
get();
return session.execute(healthCheck.bind());
session.execute(healthCheck.bind());
}

void close() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand All @@ -19,7 +19,6 @@
import com.datastax.oss.driver.api.core.metadata.schema.KeyspaceMetadata;
import java.util.Map;
import java.util.UUID;
import java.util.regex.Pattern;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -105,7 +104,7 @@ static Version ensureVersion(com.datastax.oss.driver.api.core.metadata.Metadata
return version;
}

static KeyspaceMetadata ensureExists(String keyspace, boolean searchEnabled, CqlSession session) {
static void ensureExists(String keyspace, boolean searchEnabled, CqlSession session) {
KeyspaceMetadata result = session.getMetadata().getKeyspace(keyspace).orElse(null);
if (result == null || result.getTable(Schema.TABLE_SPAN).isEmpty()) {
LOG.info("Installing schema {} for keyspace {}", SCHEMA_RESOURCE, keyspace);
Expand All @@ -125,7 +124,6 @@ static KeyspaceMetadata ensureExists(String keyspace, boolean searchEnabled, Cql
LOG.info("Upgrading schema {}", UPGRADE_2);
applyCqlFile(keyspace, session, UPGRADE_2);
}
return result;
}

static boolean hasUpgrade1_autocompleteTags(KeyspaceMetadata keyspaceMetadata) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -32,7 +32,7 @@ static final class Factory {
Factory(CqlSession session) {
this.session = session;
this.preparedStatement = session.prepare("SELECT DISTINCT service"
+ " FROM " + TABLE_SERVICE_SPANS);
+ " FROM " + TABLE_SERVICE_SPANS);
}

Call<List<String>> create() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_LOGGER_SUCCESS_ENABLED;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_LOGGER_VALUES;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TIMEOUT;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TRACKER_CLASS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_TRACKER_CLASSES;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.REQUEST_WARN_IF_SET_KEYSPACE;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.SSL_ENGINE_FACTORY_CLASS;
import static com.datastax.oss.driver.api.core.config.DefaultDriverOption.SSL_HOSTNAME_VALIDATION;
Expand Down Expand Up @@ -86,7 +86,7 @@ public static CqlSession buildSession(
// Log categories can enable query logging
Logger requestLogger = LoggerFactory.getLogger(SessionBuilder.class);
if (requestLogger.isDebugEnabled()) {
config = config.withClass(REQUEST_TRACKER_CLASS, RequestLogger.class);
config = config.withClassList(REQUEST_TRACKER_CLASSES, List.of(RequestLogger.class));
config = config.withBoolean(REQUEST_LOGGER_SUCCESS_ENABLED, true);
// Only show bodies when TRACE is enabled
config = config.withBoolean(REQUEST_LOGGER_VALUES, requestLogger.isTraceEnabled());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 The OpenZipkin Authors
* Copyright 2015-2024 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
Expand Down Expand Up @@ -60,7 +60,7 @@ protected DeduplicatingInsert(DelayLimiter<I> delayLimiter, I input) {
}

@Override protected final void doEnqueue(Callback<Void> callback) {
super.doEnqueue(new Callback<Void>() {
super.doEnqueue(new Callback<>() {
@Override public void onSuccess(Void value) {
callback.onSuccess(value);
}
Expand Down
Loading

0 comments on commit 4f263de

Please sign in to comment.