Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pulsar collector #3788

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
!zipkin-collector/kafka/src/main/**
!zipkin-collector/rabbitmq/src/main/**
!zipkin-collector/scribe/src/main/**
!zipkin-collector/pulsar/src/main/**
!zipkin-junit5/src/main/**
!zipkin-storage/src/main/**
!zipkin-storage/cassandra/src/main/**
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
- name: zipkin-collector-activemq
- name: zipkin-collector-kafka
- name: zipkin-collector-rabbitmq
- name: zipkin-collector-pulsar
- name: zipkin-storage-cassandra
- name: zipkin-storage-elasticsearch
- name: zipkin-storage-mysql-v1
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/test_readme.yml
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,9 @@ jobs:
build-bin/docker/docker_test_image openzipkin/zipkin-rabbitmq:test
env:
DOCKER_FILE: docker/test-images/zipkin-rabbitmq/Dockerfile
- name: docker/test-images/zipkin-pulsar/README.md
run: |
build-bin/docker/docker_build openzipkin/zipkin-pulsar:test &&
build-bin/docker/docker_test_image openzipkin/zipkin-pulsar:test
env:
DOCKER_FILE: docker/test-images/zipkin-pulsar/Dockerfile
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ aggregate behavior including error paths or calls to deprecated services.
Application’s need to be “instrumented” to report trace data to Zipkin. This
usually means configuration of a [tracer or instrumentation library](https://zipkin.io/pages/tracers_instrumentation.html). The most
popular ways to report data to Zipkin are via http or Kafka, though many other
options exist, such as Apache ActiveMQ, gRPC and RabbitMQ. The data served to
options exist, such as Apache ActiveMQ, gRPC, RabbitMQ and Apache Pulsar. The data served to
the UI is stored in-memory, or persistently with a supported backend such as
Apache Cassandra or Elasticsearch.

Expand Down
1 change: 1 addition & 0 deletions docker/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ base layer `openzipkin/zipkin`, and setting up schema where relevant.
* [ghcr.io/openzipkin/zipkin-eureka](test-images/zipkin-eureka/README.md) - runs Eureka
* [ghcr.io/openzipkin/zipkin-kafka](test-images/zipkin-kafka/README.md) - runs both Kafka+ZooKeeper
* [ghcr.io/openzipkin/zipkin-mysql](test-images/zipkin-mysql/README.md) - runs MySQL initialized with Zipkin's schema
* [ghcr.io/openzipkin/zipkin-pulsar](test-images/zipkin-pulsar/README.md) - runs Pulsar
* [ghcr.io/openzipkin/zipkin-rabbitmq](test-images/zipkin-rabbitmq/README.md) - runs RabbitMQ
* [ghcr.io/openzipkin/zipkin-ui](test-images/zipkin-ui/README.md) - serves the (Lens) UI directly with NGINX

Expand Down
12 changes: 12 additions & 0 deletions docker/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ $ docker compose -f docker-compose-rabbitmq.yml up
Then configure the [RabbitMQ sender](https://github.com/openzipkin/zipkin-reporter-java/blob/master/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java)
using a `host` value of `localhost` or a non-local hostname if in docker.


## Pulsar

You can collect traces from [Pulsar](../test-images/zipkin-pulsar/README.md) in addition to HTTP, using the
`docker-compose-pulsar.yml` file. This configuration starts `zipkin` and `zipkin-pulsar` in their
own containers.

To add Pulsar configuration, run:
```bash
$ docker compose -f docker-compose-pulsar.yml up
```

## Eureka

You can register Zipkin for service discovery in [Eureka](../test-images/zipkin-eureka/README.md)
Expand Down
32 changes: 32 additions & 0 deletions docker/examples/docker-compose-pulsar.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright The OpenZipkin Authors
# SPDX-License-Identifier: Apache-2.0
#

# This file uses the version 2 docker compose file format, described here:
# https://docs.docker.com/compose/compose-file/#version-2
#
# It extends the default configuration from docker-compose.yml to add a test
# pulsar server, which is used as a span transport.

version: '2.4'

services:
pulsar:
image: ghcr.io/openzipkin/zipkin-pulsar:${TAG:-latest}
container_name: pulsar
ports: # expose the pulsar port so apps can publish spans.
- "6650:6650"
- "8080:8080"

zipkin:
extends:
file: docker-compose.yml
service: zipkin
# slim doesn't include Pulsar support, so switch to the larger image
image: ghcr.io/openzipkin/zipkin:${TAG:-latest}
environment:
- PULSAR_SERVICE_URL=pulsar://pulsar:6650
depends_on:
pulsar:
condition: service_healthy
35 changes: 35 additions & 0 deletions docker/test-images/zipkin-pulsar/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#
# Copyright The OpenZipkin Authors
# SPDX-License-Identifier: Apache-2.0
#


# Use latest from https://hub.docker.com/r/apachepulsar/pulsar/tags
ARG pulsar_version=4.0.2

# We copy files from the context into a scratch container first to avoid a problem where docker and
# docker compose don't share layer hashes https://github.com/docker/compose/issues/883 normally.
# COPY --from= works around the issue.
FROM scratch as scratch

WORKDIR /docker-bin
COPY build-bin/docker/docker-healthcheck /docker-bin/

ARG pulsar_version

FROM apachepulsar/pulsar:${pulsar_version} as zipkin-pulsar
LABEL pulsar-version=$pulsar_version
LABEL org.opencontainers.image.description="Apache Pulsar on Alpine Linux"

# Add HEALTHCHECK and ENTRYPOINT scripts into the default search path
COPY --from=scratch /docker-bin/* /usr/local/bin/
# We use start period of 30s to avoid marking the container unhealthy on slow or contended CI hosts
HEALTHCHECK --interval=1s --start-period=30s --timeout=5s CMD ["docker-healthcheck"]

# Usually, we read env set from pid 1 to get docker-healthcheck parameters.
# However, pulsar-server has to start as root even if permissions are dropped
# later. So, we expose it in the Dockerfile instead.
ENV HEALTHCHECK_PORT=8080
ENV HEALTHCHECK_KIND=tcp
EXPOSE 8080 6650
CMD ["bin/pulsar", "standalone"]
11 changes: 11 additions & 0 deletions docker/test-images/zipkin-pulsar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
## zipkin-pulsar Docker image

The `zipkin-pulsar` testing image runs Pulsar for [Pulsar collector](../../../zipkin-collector/pulsar)
integration.

To build `openzipkin/zipkin-pulsar:test`, from the top-level of the repository, run:
```bash
$ DOCKER_FILE=docker/test-images/zipkin-pulsar/Dockerfile build-bin/docker/docker_build openzipkin/zipkin-pulsar:test
```

You can use the env variable `JAVA_OPTS` to change settings such as heap size for Pulsar.
1 change: 1 addition & 0 deletions zipkin-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<module>kafka</module>
<module>rabbitmq</module>
<module>scribe</module>
<module>pulsar</module>
</modules>

<dependencies>
Expand Down
51 changes: 51 additions & 0 deletions zipkin-collector/pulsar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# collector-pulsar

## PulsarCollector

This collector is implemented as a Pulsar consumer supporting Pulsar brokers running
version 4.x or later, and the default subscription type is `Shared`, in Shared subscription type,
multiple consumers can attach to the same subscription and messages are delivered
in a round-robin distribution across consumers.

For information about running this collector as a module in Zipkin server, see
the [Zipkin Server README](../../zipkin-server/README.md#pulsar-collector).

When using this collector as a library outside of Zipkin server,
[zipkin2.collector.pulsar.PulsarCollector.Builder](src/main/java/zipkin2/collector/pulsar/PulsarCollector.java)
includes defaults that will operate against a Pulsar topic name `zipkin`.

## Encoding spans into Pulsar messages

The message's binary data includes a list of spans. Supported encodings
are the same as the http [POST /spans](https://zipkin.io/zipkin-api/#/paths/%252Fspans) body.

### Json

The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).

`Codec.JSON.writeSpans(spans)` performs the correct json encoding.

### Thrift

The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol

`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:

```
write_byte(12) // type of the list elements: 12 == struct
write_i32(count) // count of spans that will follow
for (int i = 0; i < count; i++) {
writeTBinaryProtocol(spans(i))
}
```

### Legacy encoding

Older versions of zipkin accepted a single span per message, as opposed
to a list per message. This practice is deprecated, but still supported.

## Logging

Zipkin by default suppresses all logging output from Pulsar client operations as they can get quite verbose. Start
Zipkin
with `--logging.level.org.apache.pulsar=INFO` or similar to override this during troubleshooting for example.
39 changes: 39 additions & 0 deletions zipkin-collector/pulsar/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright The OpenZipkin Authors
SPDX-License-Identifier: Apache-2.0

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-collector-parent</artifactId>
<version>3.4.5-SNAPSHOT</version>
</parent>

<artifactId>zipkin-collector-pulsar</artifactId>
<name>Collector: Pulsar</name>

<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
<pulsar-client.version>4.0.2</pulsar-client.version>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-collector</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar-client.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.collector.pulsar;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import zipkin2.CheckResult;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class LazyPulsarInit {
reta marked this conversation as resolved.
Show resolved Hide resolved

private final Collector collector;
private final CollectorMetrics metrics;
private final String topic;
private final int concurrency;
private final Map<String, Object> clientProps, consumerProps;
public volatile PulsarClient result;
final AtomicReference<CheckResult> failure = new AtomicReference<>();

LazyPulsarInit(PulsarCollector.Builder builder) {
this.collector = builder.delegate.build();
this.metrics = builder.metrics;
this.topic = builder.topic;
this.concurrency = builder.concurrency;
this.clientProps = builder.clientProps;
this.consumerProps = builder.consumerProps;
}

public void init() {
reta marked this conversation as resolved.
Show resolved Hide resolved
if (result == null) {
synchronized (this) {
if (result == null) {
result = subscribe();
}
}
}
}

private PulsarClient subscribe() {
PulsarClient client;
try {
client = PulsarClient.builder()
.loadConf(clientProps)
.operationTimeout(6, TimeUnit.SECONDS)
.connectionTimeout(12, TimeUnit.SECONDS)
.build();
} catch (Exception e) {
throw new RuntimeException("Pulsar client create failed" + e.getMessage(), e);
reta marked this conversation as resolved.
Show resolved Hide resolved
reta marked this conversation as resolved.
Show resolved Hide resolved
}

try {
for (int i = 0; i < concurrency; i++) {
PulsarSpanConsumer consumer = new PulsarSpanConsumer(topic, consumerProps, client, collector, metrics);
consumer.startConsumer();
}
return client;
} catch (Exception e) {
try {
reta marked this conversation as resolved.
Show resolved Hide resolved
client.close();
} catch (PulsarClientException ex) {
// Nobody cares me.
}
throw new RuntimeException("Pulsar unable to subscribe the topic(" + topic + "), please check the pulsar service.", e);
reta marked this conversation as resolved.
Show resolved Hide resolved
}

}

public void close() throws PulsarClientException {
reta marked this conversation as resolved.
Show resolved Hide resolved
PulsarClient maybe = result;
if (maybe != null) result.close();
reta marked this conversation as resolved.
Show resolved Hide resolved
}
}
Loading
Loading