Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Pulsar collector #3788

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@
!zipkin-collector/kafka/src/main/**
!zipkin-collector/rabbitmq/src/main/**
!zipkin-collector/scribe/src/main/**
!zipkin-collector/pulsar/src/main/**
!zipkin-junit5/src/main/**
!zipkin-storage/src/main/**
!zipkin-storage/cassandra/src/main/**
Expand Down
1 change: 1 addition & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ jobs:
- name: zipkin-collector-activemq
- name: zipkin-collector-kafka
- name: zipkin-collector-rabbitmq
- name: zipkin-collector-pulsar
- name: zipkin-storage-cassandra
- name: zipkin-storage-elasticsearch
- name: zipkin-storage-mysql-v1
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ aggregate behavior including error paths or calls to deprecated services.
Application’s need to be “instrumented” to report trace data to Zipkin. This
usually means configuration of a [tracer or instrumentation library](https://zipkin.io/pages/tracers_instrumentation.html). The most
popular ways to report data to Zipkin are via http or Kafka, though many other
options exist, such as Apache ActiveMQ, gRPC and RabbitMQ. The data served to
options exist, such as Apache ActiveMQ, gRPC, RabbitMQ and Apache Pulsar. The data served to
the UI is stored in-memory, or persistently with a supported backend such as
Apache Cassandra or Elasticsearch.

Expand Down
12 changes: 12 additions & 0 deletions docker/examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,18 @@ $ docker compose -f docker-compose-rabbitmq.yml up
Then configure the [RabbitMQ sender](https://github.com/openzipkin/zipkin-reporter-java/blob/master/amqp-client/src/main/java/zipkin2/reporter/amqp/RabbitMQSender.java)
using a `host` value of `localhost` or a non-local hostname if in docker.


## Pulsar

You can collect traces from [Pulsar](../test-images/zipkin-pulsar/README.md) in addition to HTTP, using the
`docker-compose-pulsar.yml` file. This configuration starts `zipkin` and `zipkin-pulsar` in their
own containers.

To add Pulsar configuration, run:
```bash
$ docker compose -f docker-compose-pulsar.yml up
```

## Eureka

You can register Zipkin for service discovery in [Eureka](../test-images/zipkin-eureka/README.md)
Expand Down
32 changes: 32 additions & 0 deletions docker/examples/docker-compose-pulsar.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#
# Copyright The OpenZipkin Authors
# SPDX-License-Identifier: Apache-2.0
#

# This file uses the version 2 docker compose file format, described here:
# https://docs.docker.com/compose/compose-file/#version-2
#
# It extends the default configuration from docker-compose.yml to add a test
# pulsar server, which is used as a span transport.

version: '2.4'

services:
pulsar:
image: ghcr.io/openzipkin/zipkin-pulsar:${TAG:-latest}
container_name: pulsar
ports: # expose the pulsar port so apps can publish spans.
- "6650:6650"
# - "8080:8080" # uncomment to expose the pulsar http port.

zipkin:
extends:
file: docker-compose.yml
service: zipkin
# slim doesn't include Pulsar support, so switch to the larger image
image: ghcr.io/openzipkin/zipkin:${TAG:-latest}
environment:
- PULSAR_SERVICE_URL=pulsar://pulsar:6650
depends_on:
pulsar:
condition: service_healthy
1 change: 1 addition & 0 deletions zipkin-collector/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
<module>kafka</module>
<module>rabbitmq</module>
<module>scribe</module>
<module>pulsar</module>
</modules>

<dependencies>
Expand Down
57 changes: 57 additions & 0 deletions zipkin-collector/pulsar/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
# collector-pulsar

## PulsarCollector

This collector is implemented as a Pulsar consumer supporting Pulsar brokers running
version 4.x or later, and the default subscription type is `Shared`, in Shared subscription type,
multiple consumers can attach to the same subscription and messages are delivered
in a round-robin distribution across consumers.

This collector is implemented as a Pulsar consumer supporting Pulsar brokers running version 4.x or later.
The default `subscriptionType` is `Shared`, which allows multiple consumers to attach to the same subscription,
with messages delivered in a round-robin distribution across consumers, the default `subscriptionInitialPosition`
is `Earliest`, you can modify the consumer settings as needed through the `consumerProps` parameter.
Also, the client settings can also be modified through the `clientProps` parameter.

For information about running this collector as a module in Zipkin server, see
the [Zipkin Server README](../../zipkin-server/README.md#pulsar-collector).

When using this collector as a library outside of Zipkin server,
[zipkin2.collector.pulsar.PulsarCollector.Builder](src/main/java/zipkin2/collector/pulsar/PulsarCollector.java)
includes defaults that will operate against a Pulsar topic name `zipkin`.

## Encoding spans into Pulsar messages

The message's binary data includes a list of spans. Supported encodings
are the same as the http [POST /spans](https://zipkin.io/zipkin-api/#/paths/%252Fspans) body.

### Json

The message's binary data is a list of spans in json. The first character must be '[' (decimal 91).

`Codec.JSON.writeSpans(spans)` performs the correct json encoding.

### Thrift

The message's binary data includes a list header followed by N spans serialized in TBinaryProtocol

`Codec.THRIFT.writeSpans(spans)` encodes spans in the following fashion:

```
write_byte(12) // type of the list elements: 12 == struct
write_i32(count) // count of spans that will follow
for (int i = 0; i < count; i++) {
writeTBinaryProtocol(spans(i))
}
```

### Legacy encoding

Older versions of zipkin accepted a single span per message, as opposed
to a list per message. This practice is deprecated, but still supported.

## Logging

Zipkin by default suppresses all logging output from Pulsar client operations as they can get quite verbose. Start
Zipkin
with `--logging.level.org.apache.pulsar=INFO` or similar to override this during troubleshooting for example.
39 changes: 39 additions & 0 deletions zipkin-collector/pulsar/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright The OpenZipkin Authors
SPDX-License-Identifier: Apache-2.0

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>io.zipkin.zipkin2</groupId>
<artifactId>zipkin-collector-parent</artifactId>
<version>3.4.5-SNAPSHOT</version>
</parent>

<artifactId>zipkin-collector-pulsar</artifactId>
<name>Collector: Pulsar</name>

<properties>
<main.basedir>${project.basedir}/../..</main.basedir>
<pulsar-client.version>4.0.2</pulsar-client.version>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-collector</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client</artifactId>
<version>${pulsar-client.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright The OpenZipkin Authors
* SPDX-License-Identifier: Apache-2.0
*/
package zipkin2.collector.pulsar;

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import zipkin2.CheckResult;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorMetrics;

import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;

class LazyPulsarInit {

private final Collector collector;
private final CollectorMetrics metrics;
private final String topic;
private final int concurrency;
private final Map<String, Object> clientProps, consumerProps;
public volatile PulsarClient result;
final AtomicReference<CheckResult> failure = new AtomicReference<>();

LazyPulsarInit(PulsarCollector.Builder builder) {
this.collector = builder.delegate.build();
this.metrics = builder.metrics;
this.topic = builder.topic;
this.concurrency = builder.concurrency;
this.clientProps = builder.clientProps;
this.consumerProps = builder.consumerProps;
}

void init() {
if (result == null) {
synchronized (this) {
if (result == null) {
result = subscribe();
}
}
}
}

private PulsarClient subscribe() {
PulsarClient client;
try {
client = PulsarClient.builder()
.loadConf(clientProps)
.build();
} catch (Exception e) {
failure.set(CheckResult.failed(e));
throw new RuntimeException("Pulsar client creation failed. " + e.getMessage(), e);
}

try {
for (int i = 0; i < concurrency; i++) {
PulsarSpanConsumer consumer = new PulsarSpanConsumer(topic, consumerProps, client, collector, metrics);
consumer.startConsumer();
}
return client;
} catch (Exception e) {
try {
reta marked this conversation as resolved.
Show resolved Hide resolved
client.close();
} catch (PulsarClientException ex) {
// Nobody cares me.
}
failure.set(CheckResult.failed(e));
throw new RuntimeException("Pulsar Client is unable to subscribe to the topic(" + topic + "), please check the service.", e);
}
}

void close() throws PulsarClientException {
PulsarClient maybe = result;
if (maybe != null) {
result.close();
result = null;
}
}
}
Loading
Loading