Skip to content

Commit

Permalink
Ignore disposition error for stale messages
Browse files Browse the repository at this point in the history
Messages settled after a link has been closed
trigger exceptions, this is expected. These exceptions
are ignored to avoid creating confusion in application.
Unsettled messages are requeued when a link is closed,
so applications must be prepared to deal with redeliveries.

This can happen for message whose processing spans over
connection recovery.
  • Loading branch information
acogoluegnes committed Apr 17, 2024
1 parent 291c2a4 commit b60fda8
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 14 deletions.
19 changes: 10 additions & 9 deletions src/main/java/com/rabbitmq/model/amqp/AmqpConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,11 @@
import static com.rabbitmq.model.Resource.State.*;

import com.rabbitmq.model.Consumer;
import com.rabbitmq.model.ModelException;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.qpid.protonj2.client.*;
import org.apache.qpid.protonj2.client.exceptions.ClientConnectionRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.ClientException;
import org.apache.qpid.protonj2.client.exceptions.ClientLinkRemotelyClosedException;
import org.apache.qpid.protonj2.client.exceptions.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -100,20 +97,22 @@ public void accept() {
inFlightMessages.release();
try {
delivery.disposition(DeliveryState.accepted(), true);
} catch (ClientIllegalStateException | ClientIOException e) {
LOGGER.debug("message accept failed: {}", e.getMessage());
} catch (ClientException e) {
throw new ModelException(e);
throw ExceptionUtils.convert(e);
}
}

@Override
public void discard() {
inFlightMessages.release();
try {
// TODO propagate condition and description for "rejected" delivery
// state
delivery.disposition(DeliveryState.rejected("", ""), true);
} catch (ClientIllegalStateException | ClientIOException e) {
LOGGER.debug("message discard failed: {}", e.getMessage());
} catch (ClientException e) {
throw new ModelException(e);
throw ExceptionUtils.convert(e);
}
}

Expand All @@ -122,8 +121,10 @@ public void requeue() {
inFlightMessages.release();
try {
delivery.disposition(DeliveryState.released(), true);
} catch (ClientIllegalStateException | ClientIOException e) {
LOGGER.debug("message requeue failed: {}", e.getMessage());
} catch (ClientException e) {
throw new ModelException(e);
throw ExceptionUtils.convert(e);
}
}
};
Expand Down
2 changes: 0 additions & 2 deletions src/main/java/com/rabbitmq/model/amqp/AmqpPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,8 @@ public Message message() {
public void publish(Message message, Callback callback) {
checkOpen();
try {
// TODO catch ClientSendTimedOutException
org.apache.qpid.protonj2.client.Message<?> nativeMessage =
((AmqpMessage) message).nativeMessage();
// TODO track confirmation task to cancel them during recovery
Tracker tracker = this.sender.send(nativeMessage.durable(true));
this.executorService.submit(
() -> {
Expand Down
55 changes: 52 additions & 3 deletions src/test/java/com/rabbitmq/model/amqp/TopologyRecoveryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static com.rabbitmq.model.Management.ExchangeType.FANOUT;
import static com.rabbitmq.model.amqp.Cli.closeConnection;
import static com.rabbitmq.model.amqp.Cli.exchangeExists;
import static com.rabbitmq.model.amqp.RecordingTopologyListenerTest.queue;
import static com.rabbitmq.model.amqp.TestUtils.assertThat;
import static com.rabbitmq.model.amqp.TestUtils.waitAtMost;
import static java.time.Duration.ofMillis;
Expand All @@ -32,9 +33,7 @@
import com.rabbitmq.model.*;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -540,6 +539,56 @@ void recoverPublisherConsumerSeveralTimes() {
}
}

@Test
void disposeStaleMessageShouldBeSilent() throws Exception {
String q = queue();
Connection connection = connection();
assertThat(connectionAttemptCount).hasValue(1);
try {
connection.management().queue(q).declare();
Publisher publisher = connection.publisherBuilder().queue(q).build();
BlockingQueue<Consumer.Context> messageContexts = new ArrayBlockingQueue<>(10);
CountDownLatch consumeLatch = new CountDownLatch(3);
connection
.consumerBuilder()
.queue(q)
.messageHandler(
(ctx, m) -> {
messageContexts.add(ctx);
consumeLatch.countDown();
})
.build();

publisher.publish(publisher.message(), ctx -> {});
publisher.publish(publisher.message(), ctx -> {});
publisher.publish(publisher.message(), ctx -> {});
assertThat(consumeLatch).completes();
assertThat(messageContexts).hasSize(3);

closeConnectionAndWaitForRecovery();

// the messages are settled after the connection recovery
// their receiver instance is closed
// we make sure no exceptions are thrown
// this simulates long processing that spans over connection recovery
Consumer.Context ctx = messageContexts.poll(10, TimeUnit.SECONDS);
ctx.accept();
ctx = messageContexts.poll(10, TimeUnit.SECONDS);
ctx.discard();
ctx = messageContexts.poll(10, TimeUnit.SECONDS);
ctx.requeue();

// the messages are requeued automatically, so they should come back
messageContexts.poll(10, TimeUnit.SECONDS).accept();
messageContexts.poll(10, TimeUnit.SECONDS).accept();
messageContexts.poll(10, TimeUnit.SECONDS).accept();

waitAtMost(() -> connection.management().queueInfo(q).messageCount() == 0);
} finally {
connection.management().queueDeletion().delete(q);
}
}

String exchange() {
return "e-" + TestUtils.name(this.testInfo);
}
Expand Down

0 comments on commit b60fda8

Please sign in to comment.