diff --git a/.dockerignore b/.dockerignore
index 498d972aec..3e91132df7 100644
--- a/.dockerignore
+++ b/.dockerignore
@@ -55,6 +55,7 @@
!zipkin-collector/kafka/src/main/**
!zipkin-collector/rabbitmq/src/main/**
!zipkin-collector/scribe/src/main/**
+!zipkin-collector/pulsar/src/main/**
!zipkin-junit5/src/main/**
!zipkin-storage/src/main/**
!zipkin-storage/cassandra/src/main/**
diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml
index 97cc06b054..82907e340f 100644
--- a/.github/workflows/test.yml
+++ b/.github/workflows/test.yml
@@ -69,6 +69,7 @@ jobs:
- name: zipkin-collector-activemq
- name: zipkin-collector-kafka
- name: zipkin-collector-rabbitmq
+ - name: zipkin-collector-pulsar
- name: zipkin-storage-cassandra
- name: zipkin-storage-elasticsearch
- name: zipkin-storage-mysql-v1
diff --git a/README.md b/README.md
index cf7c54c354..f1cda681e6 100644
--- a/README.md
+++ b/README.md
@@ -24,7 +24,7 @@ aggregate behavior including error paths or calls to deprecated services.
Application’s need to be “instrumented” to report trace data to Zipkin. This
usually means configuration of a [tracer or instrumentation library](https://zipkin.io/pages/tracers_instrumentation.html). The most
popular ways to report data to Zipkin are via http or Kafka, though many other
-options exist, such as Apache ActiveMQ, gRPC and RabbitMQ. The data served to
+options exist, such as Apache ActiveMQ, gRPC, RabbitMQ and Apache Pulsar. The data served to
the UI is stored in-memory, or persistently with a supported backend such as
Apache Cassandra or Elasticsearch.
diff --git a/docker/examples/README.md b/docker/examples/README.md
index 9655c7c43d..967d54360d 100644
--- a/docker/examples/README.md
+++ b/docker/examples/README.md
@@ -118,6 +118,18 @@ $ docker compose -f docker-compose-rabbitmq.yml up
Then configure the [RabbitMQ sender](https://github.com/openzipkin/zipkin-reporter-java/blob/master/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java)
using a `host` value of `localhost` or a non-local hostname if in docker.
+
+## Pulsar
+
+You can collect traces from [Pulsar](../test-images/zipkin-pulsar/README.md) in addition to HTTP, using the
+`docker-compose-pulsar.yml` file. This configuration starts `zipkin` and `zipkin-pulsar` in their
+own containers.
+
+To add Pulsar configuration, run:
+```bash
+$ docker compose -f docker-compose-pulsar.yml up
+```
+
## Eureka
You can register Zipkin for service discovery in [Eureka](../test-images/zipkin-eureka/README.md)
diff --git a/docker/examples/docker-compose-pulsar.yml b/docker/examples/docker-compose-pulsar.yml
new file mode 100644
index 0000000000..0975d798ae
--- /dev/null
+++ b/docker/examples/docker-compose-pulsar.yml
@@ -0,0 +1,32 @@
+#
+# Copyright The OpenZipkin Authors
+# SPDX-License-Identifier: Apache-2.0
+#
+
+# This file uses the version 2 docker compose file format, described here:
+# https://docs.docker.com/compose/compose-file/#version-2
+#
+# It extends the default configuration from docker-compose.yml to add a test
+# pulsar server, which is used as a span transport.
+
+version: '2.4'
+
+services:
+ pulsar:
+ image: ghcr.io/openzipkin/zipkin-pulsar:${TAG:-latest}
+ container_name: pulsar
+ ports: # expose the pulsar port so apps can publish spans.
+ - "6650:6650"
+ # - "8080:8080" # uncomment to expose the pulsar http port.
+
+ zipkin:
+ extends:
+ file: docker-compose.yml
+ service: zipkin
+ # slim doesn't include Pulsar support, so switch to the larger image
+ image: ghcr.io/openzipkin/zipkin:${TAG:-latest}
+ environment:
+ - PULSAR_SERVICE_URL=pulsar://pulsar:6650
+ depends_on:
+ pulsar:
+ condition: service_healthy
diff --git a/zipkin-collector/pom.xml b/zipkin-collector/pom.xml
index a2ac547490..99eef4babe 100644
--- a/zipkin-collector/pom.xml
+++ b/zipkin-collector/pom.xml
@@ -29,6 +29,7 @@
kafkarabbitmqscribe
+ pulsar
diff --git a/zipkin-collector/pulsar/README.md b/zipkin-collector/pulsar/README.md
new file mode 100644
index 0000000000..98c75a40c0
--- /dev/null
+++ b/zipkin-collector/pulsar/README.md
@@ -0,0 +1,57 @@
+# collector-pulsar
+
+## PulsarCollector
+
+This collector is implemented as a Pulsar consumer supporting Pulsar brokers running
+version 4.x or later, and the default subscription type is `Shared`, in Shared subscription type,
+multiple consumers can attach to the same subscription and messages are delivered
+in a round-robin distribution across consumers.
+
+This collector is implemented as a Pulsar consumer supporting Pulsar brokers running version 4.x or later.
+The default `subscriptionType` is `Shared`, which allows multiple consumers to attach to the same subscription,
+with messages delivered in a round-robin distribution across consumers, the default `subscriptionInitialPosition`
+is `Earliest`, you can modify the consumer settings as needed through the `consumerProps` parameter.
+Also, the client settings can also be modified through the `clientProps` parameter.
+
+For information about running this collector as a module in Zipkin server, see
+the [Zipkin Server README](../../zipkin-server/README.md#pulsar-collector).
+
+When using this collector as a library outside of Zipkin server,
+[zipkin2.collector.pulsar.PulsarCollector.Builder](src/main/java/zipkin2/collector/pulsar/PulsarCollector.java)
+includes defaults that will operate against a Pulsar topic name `zipkin`.
+
+## Encoding spans into Pulsar messages
+
+The message's binary data includes a list of spans. Supported encodings
+are the same as the http [POST /spans](https://zipkin.io/zipkin-api/#/paths/%252Fspans) body.
+
+### Json
+
+The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).
+
+`Codec.JSON.writeSpans(spans)` performs the correct json encoding.
+
+### Thrift
+
+The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol
+
+`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:
+
+```
+write_byte(12) // type of the list elements: 12 == struct
+write_i32(count) // count of spans that will follow
+for (int i = 0; i < count; i++) {
+ writeTBinaryProtocol(spans(i))
+}
+```
+
+### Legacy encoding
+
+Older versions of zipkin accepted a single span per message, as opposed
+to a list per message. This practice is deprecated, but still supported.
+
+## Logging
+
+Zipkin by default suppresses all logging output from Pulsar client operations as they can get quite verbose. Start
+Zipkin
+with `--logging.level.org.apache.pulsar=INFO` or similar to override this during troubleshooting for example.
diff --git a/zipkin-collector/pulsar/pom.xml b/zipkin-collector/pulsar/pom.xml
new file mode 100644
index 0000000000..383bbd2238
--- /dev/null
+++ b/zipkin-collector/pulsar/pom.xml
@@ -0,0 +1,39 @@
+
+
+
+ 4.0.0
+
+
+ io.zipkin.zipkin2
+ zipkin-collector-parent
+ 3.4.5-SNAPSHOT
+
+
+ zipkin-collector-pulsar
+ Collector: Pulsar
+
+
+ ${project.basedir}/../..
+ 4.0.2
+
+
+
+
+ ${project.groupId}
+ zipkin-collector
+ ${project.version}
+
+
+
+ org.apache.pulsar
+ pulsar-client
+ ${pulsar-client.version}
+
+
+
diff --git a/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/LazyPulsarInit.java b/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/LazyPulsarInit.java
new file mode 100644
index 0000000000..ed5a76a2b1
--- /dev/null
+++ b/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/LazyPulsarInit.java
@@ -0,0 +1,80 @@
+/*
+ * Copyright The OpenZipkin Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package zipkin2.collector.pulsar;
+
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import zipkin2.CheckResult;
+import zipkin2.collector.Collector;
+import zipkin2.collector.CollectorMetrics;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+class LazyPulsarInit {
+
+ private final Collector collector;
+ private final CollectorMetrics metrics;
+ private final String topic;
+ private final int concurrency;
+ private final Map clientProps, consumerProps;
+ public volatile PulsarClient result;
+ final AtomicReference failure = new AtomicReference<>();
+
+ LazyPulsarInit(PulsarCollector.Builder builder) {
+ this.collector = builder.delegate.build();
+ this.metrics = builder.metrics;
+ this.topic = builder.topic;
+ this.concurrency = builder.concurrency;
+ this.clientProps = builder.clientProps;
+ this.consumerProps = builder.consumerProps;
+ }
+
+ void init() {
+ if (result == null) {
+ synchronized (this) {
+ if (result == null) {
+ result = subscribe();
+ }
+ }
+ }
+ }
+
+ private PulsarClient subscribe() {
+ PulsarClient client;
+ try {
+ client = PulsarClient.builder()
+ .loadConf(clientProps)
+ .build();
+ } catch (Exception e) {
+ failure.set(CheckResult.failed(e));
+ throw new RuntimeException("Pulsar client creation failed. " + e.getMessage(), e);
+ }
+
+ try {
+ for (int i = 0; i < concurrency; i++) {
+ PulsarSpanConsumer consumer = new PulsarSpanConsumer(topic, consumerProps, client, collector, metrics);
+ consumer.startConsumer();
+ }
+ return client;
+ } catch (Exception e) {
+ try {
+ client.close();
+ } catch (PulsarClientException ex) {
+ // Nobody cares me.
+ }
+ failure.set(CheckResult.failed(e));
+ throw new RuntimeException("Pulsar Client is unable to subscribe to the topic(" + topic + "), please check the service.", e);
+ }
+ }
+
+ void close() throws PulsarClientException {
+ PulsarClient maybe = result;
+ if (maybe != null) {
+ result.close();
+ result = null;
+ }
+ }
+}
\ No newline at end of file
diff --git a/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/PulsarCollector.java b/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/PulsarCollector.java
new file mode 100644
index 0000000000..c614cb51da
--- /dev/null
+++ b/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/PulsarCollector.java
@@ -0,0 +1,156 @@
+/*
+ * Copyright The OpenZipkin Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package zipkin2.collector.pulsar;
+
+import io.opentelemetry.api.internal.StringUtils;
+import zipkin2.Call;
+import zipkin2.CheckResult;
+import zipkin2.collector.Collector;
+import zipkin2.collector.CollectorComponent;
+import zipkin2.collector.CollectorMetrics;
+import zipkin2.collector.CollectorSampler;
+import zipkin2.storage.StorageComponent;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+
+/** This collector consumes encoded binary messages from a Pulsar topic. */
+public final class PulsarCollector extends CollectorComponent {
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ /** Configuration including defaults needed to consume spans from a Pulsar topic. */
+ public static final class Builder extends CollectorComponent.Builder {
+ final Collector.Builder delegate = Collector.newBuilder(PulsarCollector.class);
+ CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;
+ Map clientProps = new HashMap<>();
+ Map consumerProps = new HashMap<>();
+ String topic = "zipkin";
+ int concurrency = 1;
+
+ @Override
+ public Builder storage(StorageComponent storage) {
+ delegate.storage(storage);
+ return this;
+ }
+
+ @Override
+ public Builder metrics(CollectorMetrics metrics) {
+ if (Objects.isNull(metrics)) throw new NullPointerException("metrics == null");
+ this.metrics = metrics.forTransport("pulsar");
+ this.delegate.metrics(this.metrics);
+ return this;
+ }
+
+ @Override
+ public Builder sampler(CollectorSampler sampler) {
+ this.delegate.sampler(sampler);
+ return this;
+ }
+
+ @Override
+ public PulsarCollector build() {
+ return new PulsarCollector(this);
+ }
+
+ /** Count of concurrent message consumers on the topic. Defaults to 1. */
+ public Builder concurrency(Integer concurrency) {
+ if (concurrency < 1) throw new IllegalArgumentException("concurrency < 1");
+ this.concurrency = concurrency;
+ return this;
+ }
+
+ /** Queue zipkin spans will be consumed from. Defaults to "zipkin". */
+ public Builder topic(String topic) {
+ if (StringUtils.isNullOrEmpty(topic)) throw new NullPointerException("topic is null or empty");
+ this.topic = topic;
+ return this;
+ }
+
+ /** The service URL for the Pulsar client ex. pulsar://my-broker:6650. No default. */
+ public Builder serviceUrl(String serviceUrl) {
+ if (StringUtils.isNullOrEmpty(serviceUrl)) throw new NullPointerException("serviceUrl is null or empty");
+ clientProps.put("serviceUrl", serviceUrl);
+ return this;
+ }
+
+ /** Specify the subscription name for this consumer. No default. */
+ public Builder subscriptionName(String subscriptionName) {
+ if (StringUtils.isNullOrEmpty(subscriptionName)) throw new NullPointerException("serviceUrl is null or empty");
+ consumerProps.put("subscriptionName", subscriptionName);
+ return this;
+ }
+
+ /**
+ * Any properties set here will override the previous Pulsar client configuration.
+ *
+ * @param clientPropsMap Map
+ * @return Builder
+ * @see org.apache.pulsar.client.api.ClientBuilder#loadConf(Map)
+ */
+ public Builder clientProps(Map clientPropsMap) {
+ if (clientPropsMap.isEmpty()) throw new NullPointerException("clientProps is empty");
+ clientProps.putAll(clientPropsMap);
+ return this;
+ }
+
+ /**
+ * Any properties set here will override the previous Pulsar consumer configuration.
+ *
+ * @param consumerPropsMap Map
+ * @return Builder
+ * @see org.apache.pulsar.client.api.ConsumerBuilder#loadConf(Map)
+ */
+ public Builder consumerProps(Map consumerPropsMap) {
+ if (consumerPropsMap.isEmpty()) throw new NullPointerException("consumerProps is empty");
+ consumerProps.putAll(consumerPropsMap);
+ return this;
+ }
+ }
+
+ final Map clientProps, consumerProps;
+ final String topic;
+ final LazyPulsarInit lazyPulsarInit;
+
+ PulsarCollector(Builder builder) {
+ clientProps = builder.clientProps;
+ consumerProps = builder.consumerProps;
+ this.topic = builder.topic;
+ this.lazyPulsarInit = new LazyPulsarInit(builder);
+ }
+
+ @Override
+ public PulsarCollector start() {
+ lazyPulsarInit.init();
+ return this;
+ }
+
+ @Override public void close() throws IOException {
+ lazyPulsarInit.close();
+ }
+
+ @Override public CheckResult check() {
+ try {
+ CheckResult failure = lazyPulsarInit.failure.get();
+ if (failure != null) return failure;
+ return CheckResult.OK;
+ } catch (Throwable th) {
+ Call.propagateIfFatal(th);
+ return CheckResult.failed(th);
+ }
+ }
+
+ @Override public String toString() {
+ return "PulsarCollector{" +
+ "clientProps=" + clientProps +
+ ", consumerProps=" + consumerProps +
+ ", topic=" + this.topic +
+ "}";
+ }
+}
diff --git a/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/PulsarSpanConsumer.java b/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/PulsarSpanConsumer.java
new file mode 100644
index 0000000000..6563573f17
--- /dev/null
+++ b/zipkin-collector/pulsar/src/main/java/zipkin2/collector/pulsar/PulsarSpanConsumer.java
@@ -0,0 +1,92 @@
+/*
+ * Copyright The OpenZipkin Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package zipkin2.collector.pulsar;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageListener;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.client.api.SubscriptionType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import zipkin2.Callback;
+import zipkin2.collector.Collector;
+import zipkin2.collector.CollectorMetrics;
+
+import java.io.Closeable;
+import java.util.Map;
+
+public class PulsarSpanConsumer implements Closeable {
+ static final Callback NOOP = new Callback<>() {
+ @Override public void onSuccess(Void value) {
+ }
+
+ @Override public void onError(Throwable t) {
+ }
+ };
+
+ private static final Logger LOG = LoggerFactory.getLogger(PulsarSpanConsumer.class);
+ private final String topic;
+ private final Map consumerProps;
+ private final PulsarClient client;
+ private final Collector collector;
+ private final CollectorMetrics metrics;
+ private Consumer consumer;
+
+ public PulsarSpanConsumer(String topic, Map consumerProps, PulsarClient client, Collector collector, CollectorMetrics metrics) {
+ this.topic = topic;
+ this.consumerProps = consumerProps;
+ this.client = client;
+ this.collector = collector;
+ this.metrics = metrics;
+ }
+
+ public void startConsumer() throws PulsarClientException {
+ consumer = client.newConsumer()
+ .topic(topic)
+ .subscriptionType(SubscriptionType.Shared)
+ .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+ .loadConf(consumerProps)
+ .messageListener(new ZipkinMessageListener<>(collector, metrics))
+ .subscribe();
+ }
+
+ @Override public void close() {
+ try {
+ if (consumer != null) {
+ consumer.close();
+ consumer = null;
+ }
+ } catch (PulsarClientException e) {
+ LOG.error("Failed to close Pulsar Consumer client.", e);
+ }
+ }
+
+ /**
+ * A message listener implementation for processing messages in a Pulsar consumer,
+ * and it should not be overridden by loadConf as it ensures that zipkin could handle span correctly.
+ */
+ record ZipkinMessageListener(Collector collector, CollectorMetrics metrics) implements MessageListener {
+
+ @Override public void received(Consumer consumer, Message msg) {
+ try {
+ final byte[] serialized = msg.getData();
+ metrics.incrementMessages();
+ metrics.incrementBytes(serialized.length);
+
+ if (serialized.length == 0) return; // lenient on empty messages
+
+ collector.acceptSpans(serialized, NOOP);
+ consumer.acknowledgeAsync(msg);
+ } catch (Throwable th) {
+ metrics.incrementMessagesDropped();
+ LOG.error("Pulsar Span Consumer failed to process the message.", th);
+ consumer.negativeAcknowledge(msg);
+ }
+ }
+ }
+}
diff --git a/zipkin-collector/pulsar/src/test/java/zipkin2/collector/pulsar/ITPulsarCollector.java b/zipkin-collector/pulsar/src/test/java/zipkin2/collector/pulsar/ITPulsarCollector.java
new file mode 100644
index 0000000000..5f7633e38a
--- /dev/null
+++ b/zipkin-collector/pulsar/src/test/java/zipkin2/collector/pulsar/ITPulsarCollector.java
@@ -0,0 +1,267 @@
+/*
+ * Copyright The OpenZipkin Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+package zipkin2.collector.pulsar;
+
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.api.TestInstance;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import zipkin2.Call;
+import zipkin2.Callback;
+import zipkin2.Component;
+import zipkin2.Span;
+import zipkin2.codec.SpanBytesEncoder;
+import zipkin2.collector.InMemoryCollectorMetrics;
+import zipkin2.storage.ForwardingStorageComponent;
+import zipkin2.storage.SpanConsumer;
+import zipkin2.storage.StorageComponent;
+
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static zipkin2.TestObjects.LOTS_OF_SPANS;
+import static zipkin2.TestObjects.UTF_8;
+import static zipkin2.codec.SpanBytesEncoder.PROTO3;
+import static zipkin2.codec.SpanBytesEncoder.THRIFT;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+@Timeout(60)
+@Tag("docker")
+public class ITPulsarCollector {
+
+ @RegisterExtension
+ static PulsarExtension pulsar = new PulsarExtension();
+ List spans = List.of(LOTS_OF_SPANS[0], LOTS_OF_SPANS[1]);
+
+ InMemoryCollectorMetrics metrics = new InMemoryCollectorMetrics();
+ InMemoryCollectorMetrics pulsarMetrics = metrics.forTransport("pulsar");
+ CopyOnWriteArraySet threadsProvidingSpans = new CopyOnWriteArraySet<>();
+ LinkedBlockingQueue> receivedSpans = new LinkedBlockingQueue<>();
+ SpanConsumer consumer;
+ PulsarClient pulsarClient;
+ PulsarCollector collector;
+ String testName;
+
+ @BeforeEach void start(TestInfo testInfo) throws PulsarClientException {
+ Optional testMethod = testInfo.getTestMethod();
+ if (testMethod.isPresent()) {
+ this.testName = testMethod.get().getName();
+ }
+ metrics.clear();
+ threadsProvidingSpans.clear();
+ receivedSpans.clear();
+ pulsarMetrics.clear();
+ consumer = (spans) -> {
+ threadsProvidingSpans.add(Thread.currentThread());
+ receivedSpans.add(spans);
+ return Call.create(null);
+ };
+ pulsarClient = PulsarClient.builder().serviceUrl(pulsar.serviceUrl()).build();
+ collector = builder().build().start();
+ }
+
+ PulsarCollector.Builder builder() {
+ return pulsar.newCollectorBuilder(testName)
+ .storage(buildStorage(consumer))
+ .metrics(metrics)
+ .subscriptionName(testName)
+ .topic(testName);
+ }
+
+ @AfterEach void tearDown() throws PulsarClientException {
+ pulsarClient.close();
+ }
+
+ @Test void checkPasses() {
+ assertThat(collector.check().ok()).isTrue();
+ }
+
+ @Test void startFailsWithInvalidServiceUrl() {
+ Throwable exception = assertThrows(RuntimeException.class, () -> {
+ collector = builder().serviceUrl("@zixin").build();
+ collector.start();
+ });
+ assertThat(exception.getMessage()).contains("Pulsar client creation failed");
+ }
+
+ /**
+ * The {@code toString()} of {@link Component} implementations appear in health check endpoints.
+ * Since these are likely to be exposed in logs and other monitoring tools, care should be taken
+ * to ensure {@code toString()} output is a reasonable length and does not contain sensitive
+ * information.
+ */
+ @Test void toStringContainsOnlySummaryInformation() throws IOException {
+ try (PulsarCollector collector = builder().build()) {
+ assertThat(collector).hasToString(String.format(
+ "PulsarCollector{clientProps={serviceUrl=%s}, consumerProps={subscriptionName=%s}, topic=%s}",
+ pulsar.serviceUrl(),
+ testName,
+ testName
+ ));
+ }
+ }
+
+ /** Ensures list encoding works: a json encoded list of spans */
+ @Test void messageWithMultipleSpans_json() throws Exception {
+ messageWithMultipleSpans(SpanBytesEncoder.JSON_V1);
+ }
+
+ /** Ensures list encoding works: a version 2 json list of spans */
+ @Test void messageWithMultipleSpans_json2() throws Exception {
+ messageWithMultipleSpans(SpanBytesEncoder.JSON_V2);
+ }
+
+ /** Ensures list encoding works: proto3 ListOfSpans */
+ @Test void messageWithMultipleSpans_proto3() throws Exception {
+ messageWithMultipleSpans(SpanBytesEncoder.PROTO3);
+ }
+
+ /** Ensures list encoding works: a TBinaryProtocol encoded list of spans */
+ @Test void messageWithMultipleSpans_thrift() throws Exception {
+ messageWithMultipleSpans(THRIFT);
+ }
+
+ /** Ensures malformed spans don't hang the collector */
+ @Test void skipsMalformedData() throws Exception {
+ byte[] malformed1 = "[\"='".getBytes(UTF_8); // screwed up json
+ byte[] malformed2 = "malformed".getBytes(UTF_8);
+ pushMessage(collector.topic, THRIFT.encodeList(spans));
+ pushMessage(collector.topic, new byte[0]);
+ pushMessage(collector.topic, malformed1);
+ pushMessage(collector.topic, malformed2);
+ pushMessage(collector.topic, THRIFT.encodeList(spans));
+
+ Thread.sleep(1000);
+
+ assertThat(pulsarMetrics.messages()).isEqualTo(5);
+ assertThat(pulsarMetrics.messagesDropped()).isEqualTo(2); // only malformed, not empty
+ assertThat(pulsarMetrics.bytes()).isEqualTo(
+ THRIFT.encodeList(spans).length * 2 + malformed1.length + malformed2.length);
+ assertThat(pulsarMetrics.spans()).isEqualTo(spans.size() * 2);
+ assertThat(pulsarMetrics.spansDropped()).isZero();
+ }
+
+ /** Guards against errors that leak from storage, such as InvalidQueryException */
+ @Test void skipsOnSpanStorageException() throws Exception {
+ collector.close();
+
+ AtomicInteger counter = new AtomicInteger();
+ consumer = (input) -> new Call.Base<>() {
+ @Override protected Void doExecute() {
+ throw new AssertionError();
+ }
+
+ @Override protected void doEnqueue(Callback callback) {
+ if (counter.getAndIncrement() == 1) {
+ callback.onError(new RuntimeException("storage fell over"));
+ } else {
+ receivedSpans.add(spans);
+ callback.onSuccess(null);
+ }
+ }
+
+ @Override public Call clone() {
+ throw new AssertionError();
+ }
+ };
+
+ collector = builder().storage(buildStorage(consumer)).build().start();
+
+ pushMessage(collector.topic, PROTO3.encodeList(spans));
+ pushMessage(collector.topic, PROTO3.encodeList(spans)); // tossed on error
+ pushMessage(collector.topic, PROTO3.encodeList(spans));
+
+ assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
+ // the only way we could read this, is if the malformed span was skipped.
+ assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
+
+ assertThat(pulsarMetrics.messages()).isEqualTo(3);
+ assertThat(pulsarMetrics.messagesDropped()).isZero(); // storage failure not message failure
+ assertThat(pulsarMetrics.bytes()).isEqualTo(PROTO3.encodeList(spans).length * 3);
+ assertThat(pulsarMetrics.spans()).isEqualTo(spans.size() * 3);
+ assertThat(pulsarMetrics.spansDropped()).isEqualTo(spans.size()); // only one dropped
+ }
+
+ @Test void messagesDistributedAcrossMultipleThreadsSuccessfully() throws Exception {
+ collector.close();
+
+ CountDownLatch latch = new CountDownLatch(2);
+ collector = builder().concurrency(2).storage(buildStorage((spans) -> {
+ latch.countDown();
+ try {
+ latch.await(); // await the other one as this proves 2 threads are in use
+ } catch (InterruptedException e) {
+ throw new AssertionError(e);
+ }
+ return consumer.accept(spans);
+ })).build().start();
+
+ pushMessage(collector.topic, new byte[]{}); // empty bodies don't go to storage
+ pushMessage(collector.topic, PROTO3.encodeList(spans));
+ pushMessage(collector.topic, PROTO3.encodeList(spans));
+
+ assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
+ latch.countDown();
+ assertThat(receivedSpans.take()).containsExactlyElementsOf(spans);
+
+ assertThat(threadsProvidingSpans).hasSize(2);
+
+ assertThat(pulsarMetrics.messages()).isEqualTo(3); // 2 + empty body for warmup
+ assertThat(pulsarMetrics.messagesDropped()).isZero();
+ assertThat(pulsarMetrics.bytes()).isEqualTo(PROTO3.encodeList(spans).length * 2);
+ assertThat(pulsarMetrics.spans()).isEqualTo(spans.size() * 2);
+ assertThat(pulsarMetrics.spansDropped()).isZero();
+ }
+
+
+ private void messageWithMultipleSpans(SpanBytesEncoder encoder) throws Exception {
+ byte[] message = encoder.encodeList(spans);
+ pushMessage(collector.topic, message);
+
+ assertThat(receivedSpans.take()).containsAll(spans);
+ assertThat(pulsarMetrics.messages()).isEqualTo(1);
+ assertThat(pulsarMetrics.messagesDropped()).isZero();
+ assertThat(pulsarMetrics.bytes()).isEqualTo(message.length);
+ assertThat(pulsarMetrics.spans()).isEqualTo(spans.size());
+ assertThat(pulsarMetrics.spansDropped()).isZero();
+ }
+
+ private void pushMessage(String topic, byte[] message) {
+ try (Producer producer = pulsarClient.newProducer().topic(topic).create()) {
+ producer.newMessage().value(message).send();
+ } catch (PulsarClientException e) {
+ throw new RuntimeException("Unable to send message to Pulsar", e);
+ }
+ }
+
+
+ static StorageComponent buildStorage(final SpanConsumer spanConsumer) {
+ return new ForwardingStorageComponent() {
+ @Override protected StorageComponent delegate() {
+ throw new AssertionError();
+ }
+
+ @Override public SpanConsumer spanConsumer() {
+ return spanConsumer;
+ }
+ };
+ }
+}
diff --git a/zipkin-collector/pulsar/src/test/java/zipkin2/collector/pulsar/PulsarExtension.java b/zipkin-collector/pulsar/src/test/java/zipkin2/collector/pulsar/PulsarExtension.java
new file mode 100644
index 0000000000..985b186bb3
--- /dev/null
+++ b/zipkin-collector/pulsar/src/test/java/zipkin2/collector/pulsar/PulsarExtension.java
@@ -0,0 +1,75 @@
+/*
+ * Copyright The OpenZipkin Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package zipkin2.collector.pulsar;
+
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.BeforeAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+
+import java.time.Duration;
+
+import static org.testcontainers.utility.DockerImageName.parse;
+
+public class PulsarExtension implements BeforeAllCallback, AfterAllCallback {
+
+ static final Logger LOGGER = LoggerFactory.getLogger(PulsarExtension.class);
+ static final int BROKER_PORT = 6650;
+ static final int BROKER_HTTP_PORT = 8080;
+
+ final PulsarContainer container = new PulsarContainer();
+
+ @Override public void beforeAll(ExtensionContext context) throws Exception {
+ if (context.getRequiredTestClass().getEnclosingClass() != null) {
+ // Only run once in outermost scope.
+ return;
+ }
+
+ container.start();
+ LOGGER.info("Using serviceUrl {}", serviceUrl());
+ }
+
+ String serviceUrl() {
+ return "pulsar://" + container.getHost() + ":" + container.getMappedPort(BROKER_PORT);
+ }
+
+ @Override public void afterAll(ExtensionContext context) throws Exception {
+ if (context.getRequiredTestClass().getEnclosingClass() != null) {
+ // Only run once in outermost scope.
+ return;
+ }
+
+ container.stop();
+ }
+
+ PulsarCollector.Builder newCollectorBuilder(String topic) {
+ return PulsarCollector.builder()
+ .topic(topic)
+ .subscriptionName("zipkin-subscription")
+ .serviceUrl(serviceUrl());
+ }
+
+ static final class PulsarContainer extends GenericContainer {
+ PulsarContainer() {
+ super(parse("ghcr.io/openzipkin/zipkin-pulsar:3.4.3"));
+ withExposedPorts(BROKER_PORT, BROKER_HTTP_PORT);
+ String cmd = "/pulsar/bin/apply-config-from-env.py /pulsar/conf/standalone.conf " +
+ "&& bin/pulsar standalone " +
+ "--no-functions-worker -nss";
+ withEnv("PULSAR_MEM", "-Xms512m -Xmx512m -XX:MaxDirectMemorySize=1g"); // limit memory usage
+ waitStrategy = new HttpWaitStrategy()
+ .forPort(BROKER_HTTP_PORT)
+ .forStatusCode(200)
+ .forPath("/admin/v2/clusters")
+ .withStartupTimeout(Duration.ofSeconds(120));
+ withCommand("/bin/bash", "-c", cmd);
+ withLogConsumer(new Slf4jLogConsumer(LOGGER));
+ }
+ }
+}
diff --git a/zipkin-collector/pulsar/src/test/resources/simplelogger.properties b/zipkin-collector/pulsar/src/test/resources/simplelogger.properties
new file mode 100644
index 0000000000..7cfe01b427
--- /dev/null
+++ b/zipkin-collector/pulsar/src/test/resources/simplelogger.properties
@@ -0,0 +1,11 @@
+# See https://www.slf4j.org/api/org/slf4j/impl/SimpleLogger.html for the full list of config options
+
+org.slf4j.simpleLogger.logFile=System.out
+org.slf4j.simpleLogger.defaultLogLevel=warn
+org.slf4j.simpleLogger.showDateTime=true
+org.slf4j.simpleLogger.dateTimeFormat=yyyy-MM-dd HH:mm:ss:SSS
+
+org.slf4j.simpleLogger.log.com.github.charithe.pulsar=info
+org.slf4j.simpleLogger.log.zipkin2.collector.pulsar=debug
+# uncomment to include pulsar consumer configuration in test logs
+#logger.org.apache.pulsar.clients.level=info
diff --git a/zipkin-server/README.md b/zipkin-server/README.md
index 74485db96a..951c74d492 100644
--- a/zipkin-server/README.md
+++ b/zipkin-server/README.md
@@ -453,6 +453,60 @@ $ KAFKA_BOOTSTRAP_SERVERS=127.0.0.1:9092 \
java -Dzipkin.collector.kafka.overrides.auto.offset.reset=latest -jar zipkin.jar
```
+
+### Pulsar Collector
+The Pulsar collector is enabled when `PULSAR_SERVICE_URL` is set to
+a v4.x+ server. The following settings apply in this case.
+Some settings correspond to "New Client Configs" in [Pulsar client properties](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java)
+and "New Consumer Configs" in [Pulsar consumer properties](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java).
+
+| Variable | Property | Description |
+|----------------------------|---------------------------------------------|--------------------------------------------------------------------------------|
+| `COLLECTOR_PULSAR_ENABLED` | `zipkin.collector.pulsar.enabled` | `false` disables the Pulsar collector. Defaults to `true`. |
+| `PULSAR_SERVICE_URL` | `zipkin.collector.pulsar.service-url` | The service URL for the Pulsar client ex. pulsar://my-broker:6650. No default. |
+| `PULSAR_TOPIC` | `zipkin.collector.pulsar.topic` | Queue zipkin spans will be consumed from. Defaults to "zipkin". |
+| `PULSAR_SUBSCRIPTION_NAME` | `zipkin.collector.pulsar.subscription-name` | Specify the subscription name for this consumer. No default. |
+| `PULSAR_CONCURRENCY` | `zipkin.collector.pulsar.concurrency` | Count of concurrent message consumers on the topic. Defaults to 1 |
+
+Example usage:
+
+```bash
+$ PULSAR_SERVICE_URL=pulsar://localhost:6650 \
+ java -jar zipkin.jar
+```
+
+
+#### Other Pulsar client properties
+You may need to set other
+[Pulsar client properties](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java), in
+addition to the ones with explicit properties defined by the collector. In this case, you need to
+prefix that property name with `zipkin.collector.pulsar.client-props` and pass it as a system property
+argument.
+
+For example, to set `num.io.threads`, you can set a system property named
+`zipkin.collector.pulsar.client-props.numIoThreads`:
+
+```bash
+$ PULSAR_SERVICE_URL=pulsar://localhost:6650 \
+ java -Dzipkin.collector.pulsar.client-props.numIoThreads=20 -jar zipkin.jar
+```
+
+
+#### Other Pulsar consumer properties
+You may need to set other
+[Pulsar consumer properties](https://github.com/apache/pulsar/blob/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java), in
+addition to the ones with explicit properties defined by the collector. In this case, you need to
+prefix that property name with `zipkin.collector.pulsar.consumer-props` and pass it as a system property
+argument.
+
+For example, to set `receiver.queue.size`, you can set a system property named
+`zipkin.collector.pulsar.consumer-props.receiverQueueSize`:
+
+```bash
+$ PULSAR_SERVICE_URL=pulsar://localhost:6650 \
+ java -Dzipkin.collector.pulsar.consumer-props.receiverQueueSize=2000 -jar zipkin.jar
+```
+
#### Detailed examples
Example targeting Kafka running in Docker:
diff --git a/zipkin-server/pom.xml b/zipkin-server/pom.xml
index f94fd0d0e4..8da0d7369b 100644
--- a/zipkin-server/pom.xml
+++ b/zipkin-server/pom.xml
@@ -288,6 +288,14 @@
true
+
+
+ ${project.groupId}.zipkin2
+ zipkin-collector-pulsar
+ ${project.version}
+ true
+
+
io.zipkin.brave
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java
index 32a03838d2..db63feabbf 100644
--- a/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/InternalZipkinConfiguration.java
@@ -16,6 +16,7 @@
import zipkin2.server.internal.mysql.ZipkinMySQLStorageConfiguration;
import zipkin2.server.internal.prometheus.ZipkinMetricsController;
import zipkin2.server.internal.prometheus.ZipkinPrometheusMetricsConfiguration;
+import zipkin2.server.internal.pulsar.ZipkinPulsarCollectorConfiguration;
import zipkin2.server.internal.rabbitmq.ZipkinRabbitMQCollectorConfiguration;
import zipkin2.server.internal.scribe.ZipkinScribeCollectorConfiguration;
import zipkin2.server.internal.ui.ZipkinUiConfiguration;
@@ -35,7 +36,8 @@
ZipkinGrpcCollector.class,
ZipkinActiveMQCollectorConfiguration.class,
ZipkinKafkaCollectorConfiguration.class,
- ZipkinRabbitMQCollectorConfiguration.class,
+ ZipkinRabbitMQCollectorConfiguration.class,
+ ZipkinPulsarCollectorConfiguration.class,
ZipkinMetricsController.class,
ZipkinHealthController.class,
ZipkinPrometheusMetricsConfiguration.class,
diff --git a/zipkin-server/src/main/java/zipkin2/server/internal/pulsar/ZipkinPulsarCollectorConfiguration.java b/zipkin-server/src/main/java/zipkin2/server/internal/pulsar/ZipkinPulsarCollectorConfiguration.java
new file mode 100644
index 0000000000..af06a2d8b3
--- /dev/null
+++ b/zipkin-server/src/main/java/zipkin2/server/internal/pulsar/ZipkinPulsarCollectorConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright The OpenZipkin Authors
+ * SPDX-License-Identifier: Apache-2.0
+ */
+package zipkin2.server.internal.pulsar;
+
+import org.springframework.boot.autoconfigure.condition.AllNestedConditions;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Condition;
+import org.springframework.context.annotation.ConditionContext;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.core.type.AnnotatedTypeMetadata;
+import zipkin2.collector.CollectorMetrics;
+import zipkin2.collector.CollectorSampler;
+import zipkin2.collector.pulsar.PulsarCollector;
+import zipkin2.storage.StorageComponent;
+
+import static io.micrometer.common.util.StringUtils.isEmpty;
+
+/** Auto-configuration for {@link PulsarCollector}. */
+@ConditionalOnClass(PulsarCollector.class)
+@Conditional(ZipkinPulsarCollectorConfiguration.PulsarConditions.class)
+@EnableConfigurationProperties(ZipkinPulsarCollectorProperties.class)
+public class ZipkinPulsarCollectorConfiguration {
+
+ @Bean(initMethod = "start")
+ PulsarCollector pulsar(
+ ZipkinPulsarCollectorProperties properties,
+ CollectorSampler sampler,
+ CollectorMetrics metrics,
+ StorageComponent storage
+ ) {
+ return properties.toBuilder().sampler(sampler).metrics(metrics).storage(storage).build();
+ }
+
+ /**
+ * This condition passes when {@link ZipkinPulsarCollectorProperties#getServiceUrl()} is set
+ * to non-empty.
+ *
+ *
This is here because the yaml defaults this property to empty like this, and spring-boot
+ * doesn't have an option to treat empty properties as unset.
+ *
+ *