Skip to content

Commit

Permalink
Fix ReceiverVerticleTracingTest#traceIsPropagated
Browse files Browse the repository at this point in the history
In Vert.x 4.5.1, there was a fix to use the OTel default context storage when the Vert.x context storage provider is not invoked on a Vert.x thread.
See eclipse-vertx/vertx-tracing#72

Since LoomKafkaProducer invokes the tracer on a virtual thread (or a worker thread), the sending task must be wrapped with the OTel current context.
Otherwise, tracing data will be lost at this point.

OTel provides an ExecutorService that does just that, so this commit refactors the producer to use an executor service instead of a queue plus manual polling.

Important: note that with this implementation, a virtual thread is created for each record, which is different from sending them one after the other with a single thread.
  • Loading branch information
tsegismont authored and matzew committed Jan 14, 2025
1 parent 35d547a commit db70907
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,20 @@
*/
package dev.knative.eventing.kafka.broker.receiverloom;

import com.google.common.util.concurrent.Uninterruptibles;
import dev.knative.eventing.kafka.broker.core.ReactiveKafkaProducer;
import dev.knative.eventing.kafka.broker.core.tracing.kafka.ProducerTracer;
import io.opentelemetry.context.Context;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.tracing.TracingPolicy;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand All @@ -41,17 +43,15 @@ public class LoomKafkaProducer<K, V> implements ReactiveKafkaProducer<K, V> {

private final Producer<K, V> producer;

private final BlockingQueue<RecordPromise<K, V>> eventQueue;
private final ExecutorService executorService;
private final AtomicBoolean isClosed;
private final ProducerTracer<?> tracer;
private final VertxInternal vertx;
private final Thread sendFromQueueThread;
private final Promise<Void> closePromise = Promise.promise();

public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
Objects.requireNonNull(v, "Vertx cannot be null");
this.producer = producer;
this.eventQueue = new LinkedBlockingQueue<>();
this.isClosed = new AtomicBoolean(false);
this.vertx = (VertxInternal) v;
final var ctxInt = ((ContextInternal) v.getOrCreateContext()).unwrap();
Expand All @@ -62,73 +62,54 @@ public LoomKafkaProducer(Vertx v, Producer<K, V> producer) {
this.tracer = null;
}

ExecutorService executorService;
if (Boolean.parseBoolean(System.getenv("ENABLE_VIRTUAL_THREADS"))) {
this.sendFromQueueThread = Thread.ofVirtual().start(this::sendFromQueue);
executorService = Executors.newVirtualThreadPerTaskExecutor();
} else {
this.sendFromQueueThread = new Thread(this::sendFromQueue);
this.sendFromQueueThread.start();
executorService = Executors.newSingleThreadExecutor();
}
this.executorService = Context.taskWrapping(executorService);
}

@Override
public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
final Promise<RecordMetadata> promise = Promise.promise();
if (isClosed.get()) {
promise.fail("Producer is closed");
} else {
eventQueue.add(new RecordPromise<>(record, this.vertx.getOrCreateContext(), promise));
return Future.failedFuture("Producer is closed");
}
PromiseInternal<RecordMetadata> promise = vertx.promise();
executorService.execute(() -> sendFromQueue(new RecordPromise<>(record, promise)));
return promise.future();
}

private void sendFromQueue() {
// Process queue elements until this is closed and the tasks queue is empty
while (!isClosed.get() || !eventQueue.isEmpty()) {
try {
final var recordPromise = eventQueue.poll(2000, TimeUnit.MILLISECONDS);
if (recordPromise == null) {
continue;
private void sendFromQueue(RecordPromise<K, V> recordPromise) {
final var startedSpan = this.tracer == null
? null
: this.tracer.prepareSendMessage(recordPromise.context(), recordPromise.record);

recordPromise
.promise
.future()
.onComplete(v -> {
if (startedSpan != null) {
startedSpan.finish(recordPromise.context());
}
})
.onFailure(cause -> {
if (startedSpan != null) {
startedSpan.fail(recordPromise.context(), cause);
}
});
try {
producer.send(recordPromise.record, (metadata, exception) -> {
if (exception != null) {
recordPromise.fail(exception);
return;
}

final var startedSpan = this.tracer == null
? null
: this.tracer.prepareSendMessage(recordPromise.getContext(), recordPromise.getRecord());

recordPromise
.getPromise()
.future()
.onComplete(v -> {
if (startedSpan != null) {
startedSpan.finish(recordPromise.getContext());
}
})
.onFailure(cause -> {
if (startedSpan != null) {
startedSpan.fail(recordPromise.getContext(), cause);
}
});
try {
producer.send(
recordPromise.getRecord(),
(metadata, exception) -> recordPromise.getContext().runOnContext(v -> {
if (exception != null) {
recordPromise.getPromise().fail(exception);
return;
}
recordPromise.getPromise().complete(metadata);
}));
} catch (final KafkaException exception) {
recordPromise
.getContext()
.runOnContext(v -> recordPromise.getPromise().fail(exception));
}
} catch (InterruptedException e) {
logger.debug("Interrupted while waiting for event queue to be populated.");
break;
}
recordPromise.complete(metadata);
});
} catch (final KafkaException exception) {
recordPromise.fail(exception);
}

logger.debug("Background thread completed.");
}

@Override
Expand All @@ -141,12 +122,9 @@ public Future<Void> close() {

Thread.ofVirtual().start(() -> {
try {
while (!eventQueue.isEmpty()) {
logger.debug("Waiting for the eventQueue to become empty");
Thread.sleep(2000L);
}
logger.debug("Waiting for sendFromQueueThread thread to complete");
sendFromQueueThread.join();
executorService.shutdown();
logger.debug("Waiting for tasks to complete");
Uninterruptibles.awaitTerminationUninterruptibly(executorService);
logger.debug("Closing the producer");
producer.close();
closePromise.complete();
Expand Down Expand Up @@ -178,35 +156,29 @@ public Producer<K, V> unwrap() {
}

private static class RecordPromise<K, V> {
private final ProducerRecord<K, V> record;
private final ContextInternal context;
private final Promise<RecordMetadata> promise;
final ProducerRecord<K, V> record;
final PromiseInternal<RecordMetadata> promise;

private RecordPromise(ProducerRecord<K, V> record, ContextInternal context, Promise<RecordMetadata> promise) {
RecordPromise(ProducerRecord<K, V> record, PromiseInternal<RecordMetadata> promise) {
this.record = record;
this.context = context;
this.promise = promise;
}

public ProducerRecord<K, V> getRecord() {
return record;
ContextInternal context() {
return promise.context();
}

public Promise<RecordMetadata> getPromise() {
return promise;
void complete(RecordMetadata result) {
promise.complete(result);
}

public ContextInternal getContext() {
return context;
void fail(Throwable cause) {
promise.fail(cause);
}
}

// Function needed for testing
public boolean isSendFromQueueThreadAlive() {
return sendFromQueueThread.isAlive();
}

public int getEventQueueSize() {
return eventQueue.size();
return !executorService.isTerminated();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
*/
package dev.knative.eventing.kafka.broker.receiverloom;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -128,7 +126,6 @@ public void testCloseIsWaitingForEmptyQueue(VertxTestContext testContext) {
.onComplete(ar -> {
testContext.verify(() -> {
assertTrue(ar.succeeded());
assertEquals(0, producer.getEventQueueSize());
assertFalse(producer.isSendFromQueueThreadAlive());

checkpoints.flag();
Expand Down

0 comments on commit db70907

Please sign in to comment.