Skip to content

Commit

Permalink
Support collect tracing through gRPC (#3585)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrproliu authored Nov 7, 2023
1 parent 44b8c06 commit 726e377
Show file tree
Hide file tree
Showing 13 changed files with 479 additions and 0 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test-v3.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ jobs:
- name: receiver-zipkin-kafka
- name: receiver-zipkin-rabbitmq
- name: receiver-zipkin-scribe
- name: receiver-zipkin-grpc
- name: storage-cassandra
steps:
- name: Checkout Repository
Expand Down
1 change: 1 addition & 0 deletions zipkin-server/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
<module>zipkin-storage-ext</module>
<module>receiver-zipkin-core</module>
<module>receiver-otlp-trace</module>
<module>receiver-zipkin-grpc</module>
</modules>

<dependencyManagement>
Expand Down
27 changes: 27 additions & 0 deletions zipkin-server/receiver-zipkin-grpc/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>zipkin-server-parent</artifactId>
<groupId>io.zipkin</groupId>
<version>2.24.4-SNAPSHOT</version>
</parent>

<artifactId>receiver-zipkin-grpc</artifactId>
<name>Zipkin gRPC Receiver</name>

<dependencies>
<dependency>
<groupId>io.zipkin</groupId>
<artifactId>receiver-zipkin-core</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>${armeria.groupId}</groupId>
<artifactId>armeria-grpc-protocol</artifactId>
<version>${armeria.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.zipkin.grpc;

import com.linecorp.armeria.common.CommonPools;
import com.linecorp.armeria.server.ServiceRequestContext;
import com.linecorp.armeria.server.grpc.protocol.AbstractUnsafeUnaryGrpcService;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.skywalking.oap.server.library.module.ModuleManager;
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.telemetry.TelemetryModule;
import org.apache.skywalking.oap.server.telemetry.api.CounterMetrics;
import org.apache.skywalking.oap.server.telemetry.api.HistogramMetrics;
import org.apache.skywalking.oap.server.telemetry.api.MetricsCreator;
import org.apache.skywalking.oap.server.telemetry.api.MetricsTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import zipkin2.Callback;
import zipkin2.codec.SpanBytesDecoder;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;

public class ZipkinGRPCHandler extends AbstractUnsafeUnaryGrpcService {
private static final Logger log = LoggerFactory.getLogger(ZipkinGRPCHandler.class.getName());

private final SpanForwardService spanForward;

private final CounterMetrics msgDroppedIncr;
private final CounterMetrics errorCounter;
private final HistogramMetrics histogram;

public ZipkinGRPCHandler(SpanForwardService spanForward, ModuleManager moduleManager) {
this.spanForward = spanForward;

MetricsCreator metricsCreator = moduleManager.find(TelemetryModule.NAME)
.provider()
.getService(MetricsCreator.class);
histogram = metricsCreator.createHistogramMetric(
"trace_in_latency",
"The process latency of trace data",
new MetricsTag.Keys("protocol"),
new MetricsTag.Values("zipkin-kafka")
);
msgDroppedIncr = metricsCreator.createCounter(
"trace_dropped_count", "The dropped number of traces",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-grpc"));
errorCounter = metricsCreator.createCounter(
"trace_analysis_error_count", "The error number of trace analysis",
new MetricsTag.Keys("protocol"), new MetricsTag.Values("zipkin-grpc")
);
}

@Override
protected CompletionStage<ByteBuf> handleMessage(ServiceRequestContext context, ByteBuf message) {
if (!message.isReadable()) {
msgDroppedIncr.inc();
return CompletableFuture.completedFuture(message); // lenient on empty messages
}

try {
CompletableFutureCallback result = new CompletableFutureCallback();

// collector.accept might block so need to move off the event loop. We make sure the
// callback is context aware to continue the trace.
Executor executor = ServiceRequestContext.mapCurrent(
ctx -> ctx.makeContextAware(ctx.blockingTaskExecutor()),
CommonPools::blockingTaskExecutor);

executor.execute(() -> {
try (HistogramMetrics.Timer ignored = histogram.createTimer()) {
spanForward.send(SpanBytesDecoder.PROTO3.decodeList(message.nioBuffer()));
result.onSuccess(null);
} catch (Exception e) {
log.error("Failed to handle message", e);
errorCounter.inc();
result.onError(e);
}
});
return result;
} finally {
message.release();
}
}

static final class CompletableFutureCallback extends CompletableFuture<ByteBuf>
implements Callback<Void> {

@Override public void onSuccess(Void value) {
complete(Unpooled.EMPTY_BUFFER);
}

@Override public void onError(Throwable t) {
completeExceptionally(t);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.zipkin.grpc;

import org.apache.skywalking.oap.server.library.module.ModuleDefine;

public class ZipkinGRPCModule extends ModuleDefine {
public static final String NAME = "receiver-zipkin-grpc";
public ZipkinGRPCModule() {
super(NAME);
}

@Override
public Class[] services() {
return new Class[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.zipkin.grpc;

import com.linecorp.armeria.common.HttpMethod;
import org.apache.skywalking.oap.server.core.CoreModule;
import org.apache.skywalking.oap.server.core.server.HTTPHandlerRegister;
import org.apache.skywalking.oap.server.library.module.ModuleConfig;
import org.apache.skywalking.oap.server.library.module.ModuleDefine;
import org.apache.skywalking.oap.server.library.module.ModuleProvider;
import org.apache.skywalking.oap.server.library.module.ModuleStartException;
import org.apache.skywalking.oap.server.library.module.ServiceNotProvidedException;
import org.apache.skywalking.oap.server.receiver.zipkin.SpanForwardService;
import org.apache.skywalking.oap.server.receiver.zipkin.ZipkinReceiverModule;
import zipkin.server.core.services.HTTPConfigurableServer;

import java.util.Arrays;

public class ZipkinGRPCProvider extends ModuleProvider {
private ZipkinGRPCReceiverConfig moduleConfig;

@Override
public String name() {
return "default";
}

@Override
public Class<? extends ModuleDefine> module() {
return ZipkinGRPCModule.class;
}

@Override
public ConfigCreator<? extends ModuleConfig> newConfigCreator() {
return new ConfigCreator<ZipkinGRPCReceiverConfig>() {

@Override
public Class<ZipkinGRPCReceiverConfig> type() {
return ZipkinGRPCReceiverConfig.class;
}

@Override
public void onInitialized(ZipkinGRPCReceiverConfig initialized) {
moduleConfig = initialized;
}
};
}

@Override
public void prepare() throws ServiceNotProvidedException, ModuleStartException {
}

@Override
public void start() throws ServiceNotProvidedException, ModuleStartException {
final SpanForwardService spanForward = getManager().find(ZipkinReceiverModule.NAME).provider().getService(SpanForwardService.class);
final ZipkinGRPCHandler receiver = new ZipkinGRPCHandler(spanForward, getManager());

final HTTPHandlerRegister httpRegister = getManager().find(CoreModule.NAME).provider().getService(HTTPHandlerRegister.class);
httpRegister.addHandler(
(HTTPConfigurableServer.ServerConfiguration) builder -> builder.service("/zipkin.proto3.SpanService/Report", receiver),
Arrays.asList(HttpMethod.POST, HttpMethod.GET));
}

@Override
public void notifyAfterCompleted() throws ServiceNotProvidedException, ModuleStartException {

}

@Override
public String[] requiredModules() {
return new String[0];
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/*
* Copyright 2015-2023 The OpenZipkin Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
* in compliance with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package zipkin.server.receiver.zipkin.grpc;

import org.apache.skywalking.oap.server.library.module.ModuleConfig;

public class ZipkinGRPCReceiverConfig extends ModuleConfig {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

zipkin.server.receiver.zipkin.grpc.ZipkinGRPCModule
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#

zipkin.server.receiver.zipkin.grpc.ZipkinGRPCProvider
Loading

0 comments on commit 726e377

Please sign in to comment.