Skip to content

Commit f9998f5

Browse files
author
Vincent Royer
committed
Add source metrics
1 parent 4686a63 commit f9998f5

File tree

3 files changed

+68
-28
lines changed

3 files changed

+68
-28
lines changed

docs/modules/ROOT/pages/monitor.adoc

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ The CDC producer JVM agent running in Cassandra nodes provides a dedicated MBean
5656
== Source Connector metrics
5757

5858
The Cassandra Source Connector also publish per message metrics:
59-
[cols="2,1,3"]
59+
[cols="2,3"]
6060
|===
6161
|Metric |Description
6262

@@ -65,25 +65,36 @@ The Cassandra Source Connector also publish per message metrics:
6565

6666
|query_latency
6767
|The CQL query latency in milliseconds to fetch the updated row. This is 0 when hitting the memory cache.
68+
69+
|replication_latency
70+
|The replication latency in milliseconds (the pulsar publication time minus the cassandra mutation writetime).
71+
6872
|===
6973

7074
Here an example of those user-defined metrics aggregated by pulsar when processing 2000 mutations:
7175

7276
[source,bash]
7377
----
74-
$ curl http://localhost:8080/metrics/ 2>/dev/null | grep user_metric
78+
curl http://localhost:8080/metrics/ 2>/dev/null | grep user
79+
# HELP process_cpu_seconds_total Total user and system CPU time spent in seconds.
7580
# HELP pulsar_source_user_metric_ User defined metric.
7681
# TYPE pulsar_source_user_metric_ summary
82+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="replication_latency",quantile="0.5",} 71683.0
83+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="replication_latency",quantile="0.9",} 99667.0
84+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="replication_latency",quantile="0.99",} 106717.0
85+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="replication_latency",quantile="0.999",} 106763.0
86+
pulsar_source_user_metric__count{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="replication_latency",} 20000.0
87+
pulsar_source_user_metric__sum{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="replication_latency",} 1.3355407E9
7788
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",quantile="0.5",} 1.0
7889
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",quantile="0.9",} 1.0
7990
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",quantile="0.99",} 1.0
8091
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",quantile="0.999",} 1.0
81-
pulsar_source_user_metric__count{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",} 2000.0
82-
pulsar_source_user_metric__sum{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",} 1000.0
83-
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.5",} 6.0
84-
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.9",} 32.0
85-
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.99",} 395.0
86-
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.999",} 509.0
87-
pulsar_source_user_metric__count{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",} 2000.0
88-
pulsar_source_user_metric__sum{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",} 26068.0
92+
pulsar_source_user_metric__count{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",} 20000.0
93+
pulsar_source_user_metric__sum{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="cache_hit",} 10000.0
94+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.5",} 2.0
95+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.9",} 9.0
96+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.99",} 104.0
97+
pulsar_source_user_metric_{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",quantile="0.999",} 1035.0
98+
pulsar_source_user_metric__count{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",} 20000.0
99+
pulsar_source_user_metric__sum{tenant="public",namespace="public/default",name="data-table1",instance_id="0",cluster="standalone",fqfn="public/default/data-table1",metric="query_latency",} 83886.0
89100
----

source-pulsar/src/main/java/com/datastax/oss/pulsar/source/CassandraSource.java

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import edu.umd.cs.findbugs.annotations.NonNull;
3636
import io.vavr.Tuple2;
3737
import io.vavr.Tuple3;
38-
import jdk.nashorn.internal.ir.annotations.Immutable;
3938
import lombok.SneakyThrows;
4039
import lombok.extern.slf4j.Slf4j;
4140
import org.apache.avro.Conversions;
@@ -71,6 +70,11 @@
7170
@Slf4j
7271
public class CassandraSource implements Source<GenericRecord>, SchemaChangeListener {
7372

73+
public static final String CACHE_HIT = "cache_hit";
74+
public static final String QUERY_LATENCY = "query_latency";
75+
public static final String REPLICATION_LATENCY = "replication_latency";
76+
77+
SourceContext sourceContext;
7478
CassandraSourceConnectorConfig config;
7579
CassandraClient cassandraClient;
7680
Consumer<KeyValue<GenericRecord, MutationValue>> consumer = null;
@@ -113,6 +117,7 @@ public CassandraSource() {
113117
@Override
114118
public void open(Map<String, Object> config, SourceContext sourceContext) throws Exception {
115119
try {
120+
this.sourceContext = sourceContext;
116121
Map<String, String> processorConfig = ConfigUtil.flatString(config);
117122
log.info("openCassandraSource {}", config);
118123
this.config = new CassandraSourceConnectorConfig(processorConfig);
@@ -301,7 +306,7 @@ private List<MyKVRecord> fillBuffer() throws Exception {
301306
final GenericRecord mutationKey = kv.getKey();
302307
final MutationValue mutationValue = kv.getValue();
303308

304-
log.info("Message from producer={} msgId={} key={} value={} schema {}\n",
309+
log.debug("Message from producer={} msgId={} key={} value={} schema {}\n",
305310
msg.getProducerName(), msg.getMessageId(), kv.getKey(), kv.getValue(), msg.getReaderSchema().orElse(null));
306311

307312
try {
@@ -316,24 +321,34 @@ private List<MyKVRecord> fillBuffer() throws Exception {
316321
executeOrdered(msg.getKey(), () -> {
317322
try {
318323
if (mutationCache.isMutationProcessed(msg.getKey(), mutationValue.getMd5Digest())) {
319-
log.info("message key={} md5={} already processed", msg.getKey(), mutationValue.getMd5Digest());
324+
log.debug("message key={} md5={} already processed", msg.getKey(), mutationValue.getMd5Digest());
320325
// discard duplicated mutation
321326
consumer.acknowledge(msg);
322327
queryResult.complete(null);
328+
sourceContext.recordMetric(CACHE_HIT, 1);
329+
sourceContext.recordMetric(QUERY_LATENCY, 0);
330+
if (msg.getProperty(Constants.WRITETIME) != null)
331+
sourceContext.recordMetric(REPLICATION_LATENCY, System.currentTimeMillis() - (Long.parseLong(msg.getProperty(Constants.WRITETIME)) / 1000));
323332
return null;
324333
}
325334

326335
List<Object> nonNullPkValues = pk.stream().filter(e -> e != null).collect(Collectors.toList());
336+
long start = System.currentTimeMillis();
327337
Tuple3<Row, ConsistencyLevel, UUID> tuple = cassandraClient.selectRow(
328338
nonNullPkValues,
329339
mutationValue.getNodeId(),
330340
Lists.newArrayList(ConsistencyLevel.LOCAL_QUORUM, ConsistencyLevel.LOCAL_ONE),
331341
getSelectStatement(converterAndQueryFinal, nonNullPkValues.size()),
332342
mutationValue.getMd5Digest());
343+
long end = System.currentTimeMillis();
344+
sourceContext.recordMetric(CACHE_HIT, 0);
345+
sourceContext.recordMetric(QUERY_LATENCY, end - start);
346+
if (msg.getProperty(Constants.WRITETIME) != null)
347+
sourceContext.recordMetric(REPLICATION_LATENCY, end - (Long.parseLong(msg.getProperty(Constants.WRITETIME)) / 1000));
333348
Object value = tuple._1 == null ? null : converterAndQueryFinal.getConverter().toConnectData(tuple._1);
334349
if (!config.getCacheOnlyIfCoordinatorMatch()
335350
|| (tuple._3 != null && tuple._3.equals(mutationValue.getNodeId()))) {
336-
log.info("addMutation key={} md5={} pk={}", msg.getKey(), mutationValue.getMd5Digest(), nonNullPkValues);
351+
log.debug("addMutation key={} md5={} pk={}", msg.getKey(), mutationValue.getMd5Digest(), nonNullPkValues);
337352
// cache the mutation digest if the coordinator is the source of this event.
338353
mutationCache.addMutationMd5(msg.getKey(), mutationValue.getMd5Digest());
339354
}
@@ -375,7 +390,7 @@ private List<MyKVRecord> fillBuffer() throws Exception {
375390
}
376391
long duration = System.currentTimeMillis() - start;
377392
long throughput = duration > 0 ? (1000 * newBuffer.size()) / duration : 0;
378-
log.info("Query time for {} msgs in {} ms throughput={} msgs/s discarded={} duplicate mutations", newBuffer.size(), duration, throughput, countDiscarded);
393+
log.debug("Query time for {} msgs in {} ms throughput={} msgs/s discarded={} duplicate mutations", newBuffer.size(), duration, throughput, countDiscarded);
379394
} catch (Exception e) {
380395
log.error("error", e);
381396
// fail every message in the buffer
@@ -421,7 +436,7 @@ public void onTableDropped(@NonNull TableMetadata table) {
421436
@SneakyThrows
422437
@Override
423438
public void onTableUpdated(@NonNull TableMetadata current, @NonNull TableMetadata previous) {
424-
log.info("onTableUpdated {} {}", current, previous);
439+
log.debug("onTableUpdated {} {}", current, previous);
425440
if (current.getKeyspace().asInternal().equals(config.getKeyspaceName())
426441
&& current.getName().asInternal().equals(config.getTableName())) {
427442
KeyspaceMetadata ksm = cassandraClient.getCqlSession().getMetadata().getKeyspace(current.getKeyspace()).get();
@@ -432,7 +447,7 @@ public void onTableUpdated(@NonNull TableMetadata current, @NonNull TableMetadat
432447
@SneakyThrows
433448
@Override
434449
public void onUserDefinedTypeCreated(@NonNull UserDefinedType type) {
435-
log.info("onUserDefinedTypeCreated {} {}", type);
450+
log.debug("onUserDefinedTypeCreated {} {}", type);
436451
if (type.getKeyspace().asInternal().equals(config.getKeyspaceName())) {
437452
KeyspaceMetadata ksm = cassandraClient.getCqlSession().getMetadata().getKeyspace(type.getKeyspace()).get();
438453
setValueConverterAndQuery(ksm, ksm.getTable(config.getTableName()).get());
@@ -441,13 +456,13 @@ public void onUserDefinedTypeCreated(@NonNull UserDefinedType type) {
441456

442457
@Override
443458
public void onUserDefinedTypeDropped(@NonNull UserDefinedType type) {
444-
log.info("onUserDefinedTypeDropped {} {}", type);
459+
log.debug("onUserDefinedTypeDropped {} {}", type);
445460
}
446461

447462
@SneakyThrows
448463
@Override
449464
public void onUserDefinedTypeUpdated(@NonNull UserDefinedType userDefinedType, @NonNull UserDefinedType userDefinedType1) {
450-
log.info("onUserDefinedTypeUpdated {} {}", userDefinedType, userDefinedType1);
465+
log.debug("onUserDefinedTypeUpdated {} {}", userDefinedType, userDefinedType1);
451466
if (userDefinedType.getKeyspace().asCql(true).equals(config.getKeyspaceName())) {
452467
KeyspaceMetadata ksm = cassandraClient.getCqlSession().getMetadata().getKeyspace(userDefinedType.getKeyspace()).get();
453468
setValueConverterAndQuery(ksm, ksm.getTable(config.getTableName()).get());

source-pulsar/src/test/java/com/datastax/oss/pulsar/source/PulsarCassandraSourceTests.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.datastax.oss.pulsar.source;
1717

1818
import com.datastax.cassandra.cdc.CqlLogicalTypes;
19+
import com.datastax.cassandra.cdc.ProducerTestUtil;
1920
import com.datastax.oss.cdc.CassandraSourceConnectorConfig;
2021
import com.datastax.oss.driver.api.core.CqlSession;
2122
import com.datastax.oss.driver.api.core.cql.*;
@@ -51,8 +52,10 @@
5152
import java.time.LocalDateTime;
5253
import java.time.ZoneId;
5354
import java.util.*;
55+
import java.util.concurrent.Executors;
5456
import java.util.concurrent.TimeUnit;
5557

58+
import static com.datastax.cassandra.cdc.ProducerTestUtil.randomizeBuffer;
5659
import static org.junit.jupiter.api.Assertions.assertEquals;
5760
import static org.junit.jupiter.api.Assertions.assertNull;
5861

@@ -570,15 +573,26 @@ public void testBatchInsert(String ksName,
570573
cqlSession.execute("CREATE KEYSPACE IF NOT EXISTS " + ksName +
571574
" WITH replication = {'class':'SimpleStrategy','replication_factor':'2'};");
572575
cqlSession.execute("CREATE TABLE IF NOT EXISTS " + ksName + ".table1 (id text, a int, b int, PRIMARY KEY (id, a)) WITH cdc=true");
573-
PreparedStatement statement = cqlSession.prepare("INSERT INTO " + ksName + ".table1 (id, a, b) VALUES (?,?,?)");
574-
BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
575-
for(int i = 0; i < 1000; i++) {
576-
batchBuilder.addStatement(statement.bind("a", i, i));
577-
}
578-
cqlSession.execute(batchBuilder.build());
579576
}
580577
deployConnector(ksName, "table1", keyConverter, valueConverter);
581578

579+
// run batch insert in parallel
580+
Executors.newSingleThreadExecutor().submit(() -> {
581+
try (CqlSession cqlSession = cassandraContainer1.getCqlSession()) {
582+
PreparedStatement statement = cqlSession.prepare("INSERT INTO " + ksName + ".table1 (id, a, b) VALUES (?,?,?)");
583+
for(int batch = 0; batch < 10; batch++) {
584+
BatchStatementBuilder batchBuilder = BatchStatement.builder(BatchType.UNLOGGED);
585+
for (int i = 0; i < 1000; i++) {
586+
batchBuilder.addStatement(statement.bind("a" + batch, i, i));
587+
}
588+
cqlSession.execute(batchBuilder.build());
589+
}
590+
// no drain, test use NRT CDC
591+
} catch (Exception e) {
592+
log.error("error:", e);
593+
}
594+
});
595+
582596
try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build()) {
583597
try (Consumer<GenericRecord> consumer = pulsarClient.newConsumer(org.apache.pulsar.client.api.Schema.AUTO_CONSUME())
584598
.topic(String.format(Locale.ROOT, "data-%s.table1", ksName))
@@ -589,16 +603,16 @@ public void testBatchInsert(String ksName,
589603
.subscribe()) {
590604
Message<GenericRecord> msg;
591605
int msgCount = 0;
592-
while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null && msgCount < 1000) {
606+
while ((msg = consumer.receive(90, TimeUnit.SECONDS)) != null && msgCount < 10000) {
593607
msgCount++;
594608
GenericObject genericObject = msg.getValue();
595609
assertEquals(SchemaType.KEY_VALUE, genericObject.getSchemaType());
596610
KeyValue<GenericRecord, GenericRecord> kv = (KeyValue<GenericRecord, GenericRecord>) genericObject.getNativeObject();
597611
GenericRecord key = kv.getKey();
598-
assertEquals("a", key.getField("id"));
612+
Assert.assertTrue(((String)key.getField("id")).startsWith("a"));
599613
consumer.acknowledge(msg);
600614
}
601-
assertEquals(1000, msgCount);
615+
assertEquals(10000, msgCount);
602616
}
603617
}
604618
} finally {

0 commit comments

Comments
 (0)