Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OTLP Zipkin Collector #5

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
facbedc
WIP
marcingrzejszczak Jun 12, 2024
c5bad8b
WIP
marcingrzejszczak Jun 13, 2024
37ef965
WIP
marcingrzejszczak Jun 14, 2024
0f7ee5a
Passing build
marcingrzejszczak Jun 14, 2024
45c1d40
Collectors are working fine
marcingrzejszczak Jun 14, 2024
538da4f
HTTP collector tests passing
marcingrzejszczak Jun 21, 2024
a810184
Added OTel HTTP exporter test
marcingrzejszczak Jun 24, 2024
81e36ed
Polish
marcingrzejszczak Jun 24, 2024
d55db5f
Removed OTel GRPC module
marcingrzejszczak Jun 24, 2024
f0680f4
Removed OTel GRPC module
marcingrzejszczak Jun 24, 2024
b766e59
Removed OTel GRPC module
marcingrzejszczak Jun 24, 2024
3639e4d
WIP on ported tests from OTel
marcingrzejszczak Jun 24, 2024
7738906
Fixed tests
marcingrzejszczak Jun 25, 2024
c868600
Trying to make the tests pass
marcingrzejszczak Jun 25, 2024
02c49f1
Trying to make the tests pass
marcingrzejszczak Jun 25, 2024
437f9e9
SpanTranslator tests
marcingrzejszczak Jun 25, 2024
d9faf6a
Added moar tests
marcingrzejszczak Jun 25, 2024
d0d2fe2
Added Zipkin tests
marcingrzejszczak Jun 25, 2024
dbbe43c
Added links
marcingrzejszczak Jun 26, 2024
791175f
Removed tests module
marcingrzejszczak Jun 26, 2024
dc7b762
Fixed translation from OTel to Zipkin
marcingrzejszczak Jun 26, 2024
f30ade5
Removing GRPC
marcingrzejszczak Jun 26, 2024
cf9c6dc
Merged removal of GRPC branch
marcingrzejszczak Jun 26, 2024
d64ff9b
Left only Zipkin OTLP collector
marcingrzejszczak Jun 26, 2024
e5bc2a4
Version bumps
marcingrzejszczak Jun 27, 2024
afe15ae
Merge branch 'main' into receiver
marcingrzejszczak Jun 27, 2024
03ac1ea
Merge branch 'versionBumps' into receiver
marcingrzejszczak Jun 27, 2024
bcbfc36
Uses decorators instead of manually decoding payload
marcingrzejszczak Jun 27, 2024
265e373
Optimize imports
marcingrzejszczak Jun 27, 2024
c706abb
Simplified Collector logic
marcingrzejszczak Jun 27, 2024
579ef19
Improved byte decoding
marcingrzejszczak Jun 27, 2024
45b26b4
Merge branch 'main' into receiver
marcingrzejszczak Jun 28, 2024
d87046d
Changes following the review
marcingrzejszczak Jun 28, 2024
cb1387e
Polish
marcingrzejszczak Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@ persists them to a configured collector component.
|------------------------------------|-----------------------------------------------------------------------------------------|
| [collector-http](./collector-http) | Implements the [OTLP/HTTP protocol](https://opentelemetry.io/docs/specs/otlp/#otlphttp) |

### Encoders

Encoding is library-specific, as some libraries use `zipkin2.Span` and others
`brave.handler.MutableSpan`. Both options are available to encode to the
OTLP format.

| Encoder | Description |
|-------------------------------------------|------------------------------------------------|
| [`OtelEncoder.V1`](./encoder-otel-zipkin) | zipkin-reporter `AsyncReporter<Span>` |
| [`OtelEncoder`](./encoder-otel-brave) | zipkin-reporter-brave `AsyncZipkinSpanHandler` |

## Server integration

If you cannot use our [Docker image](./docker/README.md), you can still integrate
Expand Down
22 changes: 18 additions & 4 deletions collector-http/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
<artifactId>zipkin-collector</artifactId>
<version>${zipkin.version}</version>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-encoder-otel</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
Expand All @@ -52,11 +57,20 @@
<version>${zipkin.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-junit5</artifactId>
<version>${armeria.version}</version>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-sdk</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.opentelemetry</groupId>
<artifactId>opentelemetry-exporter-otlp</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,37 @@
*/
package zipkin2.collector.otel.http;

import com.google.protobuf.CodedInputStream;
import com.linecorp.armeria.common.AggregationOptions;
import com.linecorp.armeria.common.HttpData;
import com.linecorp.armeria.common.HttpRequest;
import com.linecorp.armeria.common.HttpResponse;
import com.linecorp.armeria.common.HttpStatus;
import com.linecorp.armeria.common.ResponseHeaders;
import com.linecorp.armeria.common.encoding.StreamDecoderFactory;
import com.linecorp.armeria.server.AbstractHttpService;
import com.linecorp.armeria.server.ServerBuilder;
import com.linecorp.armeria.server.ServerConfigurator;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.encoding.DecodingService;
import io.netty.buffer.ByteBufAllocator;
import io.opentelemetry.proto.collector.trace.v1.ExportTraceServiceRequest;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import zipkin2.Callback;
import zipkin2.Span;
import zipkin2.collector.Collector;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorMetrics;
import zipkin2.collector.CollectorSampler;
import zipkin2.storage.StorageComponent;
import zipkin2.translation.zipkin.SpanTranslator;

public final class OpenTelemetryHttpCollector extends CollectorComponent
implements ServerConfigurator {

public static Builder newBuilder() {
return new Builder();
}
Expand All @@ -36,23 +53,29 @@ public static final class Builder extends CollectorComponent.Builder {
Collector.Builder delegate = Collector.newBuilder(OpenTelemetryHttpCollector.class);
CollectorMetrics metrics = CollectorMetrics.NOOP_METRICS;

@Override public Builder storage(StorageComponent storageComponent) {
@Override
public Builder storage(StorageComponent storageComponent) {
delegate.storage(storageComponent);
return this;
}

@Override public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) throw new NullPointerException("metrics == null");
@Override
public Builder metrics(CollectorMetrics metrics) {
if (metrics == null) {
throw new NullPointerException("metrics == null");
}
delegate.metrics(this.metrics = metrics.forTransport("otel/http"));
return this;
}

@Override public Builder sampler(CollectorSampler sampler) {
@Override
public Builder sampler(CollectorSampler sampler) {
delegate.sampler(sampler);
return this;
}

@Override public OpenTelemetryHttpCollector build() {
@Override
public OpenTelemetryHttpCollector build() {
return new OpenTelemetryHttpCollector(this);
}

Expand All @@ -68,31 +91,77 @@ public static final class Builder extends CollectorComponent.Builder {
metrics = builder.metrics;
}

@Override public OpenTelemetryHttpCollector start() {
@Override
public OpenTelemetryHttpCollector start() {
return this;
}

@Override public String toString() {
@Override
public String toString() {
return "OpenTelemetryHttpCollector{}";
}

/**
* Reconfigures the service per https://opentelemetry.io/docs/specs/otlp/#otlphttp-request
*/
@Override public void reconfigure(ServerBuilder sb) {
@Override
public void reconfigure(ServerBuilder sb) {
sb.decorator(DecodingService.newDecorator(StreamDecoderFactory.gzip()));
sb.service("/v1/traces", new HttpService(this));
}

static final class HttpService extends AbstractHttpService {

final OpenTelemetryHttpCollector collector;

HttpService(OpenTelemetryHttpCollector collector) {
this.collector = collector;
}

@Override protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req)
@Override
protected HttpResponse doPost(ServiceRequestContext ctx, HttpRequest req)
throws Exception {
throw new RuntimeException("Implement me!");
CompletableCallback result = new CompletableCallback();
req.aggregate(AggregationOptions.usePooledObjects(ByteBufAllocator.DEFAULT, ctx.eventLoop()
)).handle((msg, t) -> {
if (t != null) {
result.onError(t);
return null;
}
try (HttpData content = msg.content()) {
if (content.isEmpty()) {
result.onSuccess(null);
return null;
}

try {
ExportTraceServiceRequest request = ExportTraceServiceRequest.parseFrom(CodedInputStream.newInstance(content.byteBuf().nioBuffer()));
List<Span> spans = SpanTranslator.translate(request);
collector.collector.accept(spans, result);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
return null;
}
});
return HttpResponse.of(result);
}
}

static final class CompletableCallback extends CompletableFuture<HttpResponse>
implements Callback<Void> {

static final ResponseHeaders ACCEPTED_RESPONSE = ResponseHeaders.of(HttpStatus.ACCEPTED);

@Override
public void onSuccess(Void value) {
complete(HttpResponse.of(ACCEPTED_RESPONSE));
}

@Override
public void onError(Throwable t) {
completeExceptionally(t);
}
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,61 @@
*/
package zipkin2.collector.otel.http;

import static io.opentelemetry.sdk.trace.samplers.Sampler.alwaysOn;
import static org.assertj.core.api.Assertions.assertThat;

import com.linecorp.armeria.server.Server;
import com.linecorp.armeria.server.ServerBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.exporter.otlp.http.trace.OtlpHttpSpanExporter;
import io.opentelemetry.sdk.OpenTelemetrySdk;
import io.opentelemetry.sdk.trace.SdkTracerProvider;
import io.opentelemetry.sdk.trace.export.BatchSpanProcessor;
import io.opentelemetry.sdk.trace.export.SpanExporter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.collector.CollectorComponent;
import zipkin2.collector.CollectorSampler;
import zipkin2.collector.InMemoryCollectorMetrics;
import zipkin2.storage.InMemoryStorage;

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class ITOpenTelemetryHttpCollector {

private static final Logger log = LoggerFactory.getLogger(ITOpenTelemetryHttpCollector.class);

InMemoryStorage store;
InMemoryCollectorMetrics metrics;
CollectorComponent collector;

SpanExporter spanExporter = OtlpHttpSpanExporter.builder()
.setCompression("gzip")
.build();

SdkTracerProvider sdkTracerProvider = SdkTracerProvider.builder()
.setSampler(alwaysOn())
.addSpanProcessor(BatchSpanProcessor.builder(spanExporter).build())
.build();

OpenTelemetrySdk openTelemetrySdk = OpenTelemetrySdk.builder()
.setTracerProvider(sdkTracerProvider)
.build();

Tracer tracer = openTelemetrySdk.getTracerProvider()
.get("zipkin2.collector.otel.http");

Server server;

@BeforeEach public void setup() {
store = InMemoryStorage.newBuilder().build();
metrics = new InMemoryCollectorMetrics();
Expand All @@ -38,13 +78,42 @@ class ITOpenTelemetryHttpCollector {
.storage(store)
.build()
.start();
ServerBuilder serverBuilder = Server.builder().http(4318);
((OpenTelemetryHttpCollector) collector).reconfigure(serverBuilder);
metrics = metrics.forTransport("otel/http");
server = serverBuilder.build();
server.start().join();
}

@AfterEach void teardown() throws IOException {
@AfterEach
void teardown() throws IOException {
store.close();
collector.close();
server.stop().join();
}

// TODO: integration test
@Test
void otelHttpExporterWorksWithZipkinOtelCollector() throws InterruptedException {
List<String> traceIds = new ArrayList<>();
final int size = 5;
for (int i = 0; i < size; i++) {
// Given
Span span = tracer.spanBuilder("foo " + i)
.setAttribute("foo tag", "foo value")
.setSpanKind(SpanKind.CONSUMER)
.startSpan();
String traceId = span.getSpanContext().getTraceId();
log.info("Trace Id <" + traceId + ">");
Thread.sleep(50);
span.addEvent("boom!");
Thread.sleep(50);

// When
span.end();
traceIds.add(traceId);
}

Awaitility.await().untilAsserted(() -> assertThat(store.acceptedSpanCount()).isEqualTo(5));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
*/
package zipkin2.collector.otel.http;

import static org.assertj.core.api.Assertions.assertThat;

import org.junit.jupiter.api.Test;
import zipkin2.storage.InMemoryStorage;

import static org.assertj.core.api.Assertions.assertThat;

class OpenTelemetryHttpCollectorTest {
OpenTelemetryHttpCollector collector = OpenTelemetryHttpCollector.newBuilder()
.storage(InMemoryStorage.newBuilder().build())
Expand Down
8 changes: 8 additions & 0 deletions encoder-otel-zipkin/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# encoder-otel-zipkin

This encodes zipkin spans into OTLP proto format.

```java
// connect the sender to the correct encoding
reporter = AsyncReporter.newBuilder(sender).build(OtelEncoder.V1);
```
52 changes: 52 additions & 0 deletions encoder-otel-zipkin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Copyright 2024 The OpenZipkin Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License
is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
or implied. See the License for the specific language governing permissions and limitations under
the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.zipkin.contrib.otel</groupId>
<artifactId>zipkin-otel-parent</artifactId>
<version>0.1.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>zipkin-encoder-otel</artifactId>
<name>Zipkin Encoder: OpenTelemetry Trace</name>

<properties>
<main.basedir>${project.basedir}/..</main.basedir>
</properties>

<dependencies>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>zipkin-translation-otel</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.zipkin.reporter2</groupId>
<artifactId>zipkin-reporter</artifactId>
<version>${zipkin-reporter.version}</version>
</dependency>

<dependency>
<groupId>${zipkin.groupId}</groupId>
<artifactId>zipkin-tests</artifactId>
<version>${zipkin.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading
Loading