Skip to content

Commit

Permalink
Use ExecutorService for sender and receiver tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Feb 22, 2024
1 parent 1d87774 commit d3be113
Show file tree
Hide file tree
Showing 7 changed files with 122 additions and 33 deletions.
13 changes: 12 additions & 1 deletion src/main/java/com/rabbitmq/model/amqp/AmqpEnvironment.java
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,20 @@ class AmqpEnvironment implements Environment {
private volatile AmqpManagement management;
private final ExecutorService executorService;
private final AtomicBoolean closed = new AtomicBoolean(false);
private final boolean internalExecutor;

AmqpEnvironment(String uri, ExecutorService executorService) {
this.connectionParameters = connectionParameters(uri);
ClientOptions clientOptions = new ClientOptions();
this.client = Client.create(clientOptions);
this.executorService = executorService;

if (executorService == null) {
this.executorService = Utils.virtualThreadExecutorServiceIfAvailable();
this.internalExecutor = true;
} else {
this.executorService = executorService;
this.internalExecutor = false;
}

ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.user(this.connectionParameters.username);
Expand Down Expand Up @@ -165,6 +173,9 @@ public void close() {
throw new ModelException(e);
}
this.client.close();
if (this.internalExecutor) {
this.executorService.shutdownNow();
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ public class AmqpEnvironmentBuilder implements EnvironmentBuilder {
private String uri = "amqp://guest:guest@localhost:5672/%2f";
private ExecutorService executorService;

public AmqpEnvironmentBuilder() {}

public AmqpEnvironmentBuilder uri(String uri) {
this.uri = uri;
return this;
Expand Down
20 changes: 16 additions & 4 deletions src/main/java/com/rabbitmq/model/amqp/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
// [email protected].
package com.rabbitmq.model.amqp;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.lang.reflect.InvocationTargetException;
import java.util.concurrent.*;

abstract class Utils {

Expand Down Expand Up @@ -78,4 +76,18 @@ public boolean isReleasable() {
}
});
}

static ExecutorService virtualThreadExecutorServiceIfAvailable() {
boolean java21OrMore = Runtime.version().compareTo(Runtime.Version.parse("21")) >= 0;
if (java21OrMore) {
try {
return (ExecutorService)
Executors.class.getDeclaredMethod("newVirtualThreadPerTaskExecutor").invoke(null);
} catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException e) {
throw new RuntimeException(e);
}
} else {
return Executors.newCachedThreadPool();
}
}
}
14 changes: 2 additions & 12 deletions src/test/java/com/rabbitmq/model/AmqpPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import static com.rabbitmq.model.Management.ExchangeType.DIRECT;
import static com.rabbitmq.model.Management.QueueType.QUORUM;
import static com.rabbitmq.model.TestUtils.environmentBuilder;

import com.codahale.metrics.MetricRegistry;
import com.rabbitmq.model.amqp.AmqpEnvironmentBuilder;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -91,17 +91,7 @@ public static void main(String[] args) throws Exception {
String e = TestUtils.name(AmqpPerfTest.class, "main");
String q = TestUtils.name(AmqpPerfTest.class, "main");
String rk = "foo";
ExecutorService envExecutorService;
boolean java21OrMore = Runtime.version().compareTo(Runtime.Version.parse("21")) >= 0;
if (java21OrMore) {
envExecutorService =
(ExecutorService)
Executors.class.getDeclaredMethod("newVirtualThreadPerTaskExecutor").invoke(null);
} else {
envExecutorService = Executors.newCachedThreadPool();
}
Environment environment =
new AmqpEnvironmentBuilder().executorService(envExecutorService).build();
Environment environment = environmentBuilder().build();
Management management = environment.management();

CountDownLatch shutdownLatch = new CountDownLatch(1);
Expand Down
6 changes: 3 additions & 3 deletions src/test/java/com/rabbitmq/model/AmqpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
import static com.rabbitmq.model.Management.ExchangeType.DIRECT;
import static com.rabbitmq.model.Management.QueueType.QUORUM;
import static com.rabbitmq.model.TestUtils.CountDownLatchConditions.completed;
import static com.rabbitmq.model.TestUtils.environmentBuilder;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.model.amqp.AmqpEnvironmentBuilder;
import java.nio.charset.StandardCharsets;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
Expand All @@ -35,7 +35,7 @@ public class AmqpTest {
@Test
void queueDeclareDeletePublishConsume(TestInfo info) {
String q = TestUtils.name(info);
Environment environment = new AmqpEnvironmentBuilder().build();
Environment environment = environmentBuilder().build();
try {
environment.management().queue().name(q).quorum().queue().declare();
String address = "/amq/queue/" + q;
Expand Down Expand Up @@ -83,7 +83,7 @@ void exchangeBinding(TestInfo info) {
String e = TestUtils.name(info);
String q = TestUtils.name(info);
String rk = "foo";
Environment environment = new AmqpEnvironmentBuilder().build();
Environment environment = environmentBuilder().build();
Management management = environment.management();
try {
management.exchange().name(e).type(DIRECT).declare();
Expand Down
64 changes: 51 additions & 13 deletions src/test/java/com/rabbitmq/model/ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,17 @@
// [email protected].
package com.rabbitmq.model;

import static com.rabbitmq.model.TestUtils.*;
import static java.nio.charset.StandardCharsets.*;
import static org.assertj.core.api.Assertions.assertThat;

import com.rabbitmq.model.amqp.AmqpEnvironmentBuilder;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import org.apache.qpid.protonj2.client.*;
import org.apache.qpid.protonj2.client.Message;
import org.junit.jupiter.api.*;

public class ClientTest {
Expand All @@ -35,7 +38,7 @@ public class ClientTest {

@BeforeAll
static void initAll() {
environment = new AmqpEnvironmentBuilder().build();
environment = environmentBuilder().build();
management = environment.management();
}

Expand All @@ -57,8 +60,7 @@ static void tearDownAll() {

@Test
void deliveryCount() throws Exception {
ClientOptions clientOptions = new ClientOptions();
try (Client client = Client.create(clientOptions);
try (Client client = client();
Publisher publisher = environment.publisherBuilder().address(q).build()) {
int messageCount = 10;
CountDownLatch publishLatch = new CountDownLatch(5);
Expand All @@ -69,21 +71,57 @@ void deliveryCount() throws Exception {
publisher.message().addData("".getBytes(UTF_8)),
context -> publishLatch.countDown()));

ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions.user("guest");
connectionOptions.password("guest");
connectionOptions.virtualHost("vhost:/");
// only the mechanisms supported in RabbitMQ
connectionOptions.saslOptions().addAllowedMechanism("PLAIN").addAllowedMechanism("EXTERNAL");

Connection connection = client.connect("localhost", 5672, connectionOptions);
Connection connection = connection(client);
Receiver receiver = connection.openReceiver(q, new ReceiverOptions());
int receivedMessages = 0;
while (receiver.receive(100, TimeUnit.MILLISECONDS) != null) {
receivedMessages++;
}

assertThat(receivedMessages).isEqualTo(messageCount);
}
}

@Test
void largeMessageWithSender() throws Exception {
try (Client client = client()) {
int maxFrameSize = 1000;
Connection connection =
connection(client, o -> o.traceFrames(false).maxFrameSize(maxFrameSize));

Sender sender =
connection.openSender(q, new SenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE));
byte[] body = new byte[maxFrameSize * 4];
Arrays.fill(body, (byte) 'A');
Tracker tracker = sender.send(Message.create(body));
tracker.awaitSettlement();
}
}

@Test
void largeMessageWithStreamSender() throws Exception {
try (Client client = client()) {
int maxFrameSize = 1000;
Connection connection =
connection(client, o -> o.traceFrames(false).maxFrameSize(maxFrameSize));

StreamSender sender =
connection.openStreamSender(
q, new StreamSenderOptions().deliveryMode(DeliveryMode.AT_LEAST_ONCE));
StreamSenderMessage message = sender.beginMessage();
byte[] body = new byte[maxFrameSize / 4];
Arrays.fill(body, (byte) 'A');

OutputStreamOptions streamOptions = new OutputStreamOptions().bodyLength(body.length);
OutputStream output = message.body(streamOptions);

final int chunkSize = 10;

for (int i = 0; i < body.length; i += chunkSize) {
output.write(body, i, chunkSize);
}

output.close();
message.tracker().awaitSettlement();
}
}
}
36 changes: 36 additions & 0 deletions src/test/java/com/rabbitmq/model/TestUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,17 @@

import static java.lang.String.format;

import com.rabbitmq.model.amqp.AmqpEnvironmentBuilder;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.qpid.protonj2.client.Client;
import org.apache.qpid.protonj2.client.Connection;
import org.apache.qpid.protonj2.client.ConnectionOptions;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.assertj.core.api.Condition;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.extension.ExtensionContext;
Expand Down Expand Up @@ -74,4 +80,34 @@ static String name(Class<?> testClass, String testMethod) {
return format(
"%s_%s%s", testClass.getSimpleName(), testMethod, uuid.substring(uuid.length() / 2));
}

static Client client() {
return Client.create();
}

static Connection connection(Client client) {
return connection(client, o -> {});
}

static Connection connection(Client client, Consumer<ConnectionOptions> optionsCallback) {
ConnectionOptions connectionOptions = new ConnectionOptions();
connectionOptions
.user("guest")
.password("guest")
.virtualHost("vhost:/")
// only the mechanisms supported in RabbitMQ
.saslOptions()
.addAllowedMechanism("PLAIN")
.addAllowedMechanism("EXTERNAL");
optionsCallback.accept(connectionOptions);
try {
return client.connect("localhost", 5672, connectionOptions);
} catch (ClientException e) {
throw new RuntimeException(e);
}
}

static AmqpEnvironmentBuilder environmentBuilder() {
return new AmqpEnvironmentBuilder();
}
}

0 comments on commit d3be113

Please sign in to comment.