diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java index 9f08819374582..91b97fa475817 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/RetryTopicTest.java @@ -24,7 +24,6 @@ import static org.testng.Assert.assertNull; import static org.testng.Assert.assertTrue; import static org.testng.Assert.fail; -import java.lang.reflect.Field; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -39,11 +38,8 @@ import org.apache.avro.reflect.Nullable; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.schema.GenericRecord; -import org.apache.pulsar.client.impl.ConsumerImpl; -import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; -import org.reflections.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.AfterMethod; @@ -620,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception { @Test(timeOut = 30000L) public void testRetryTopicException() throws Exception { - final String topic = "persistent://my-property/my-ns/retry-topic"; + String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic"); + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic"); final int maxRedeliveryCount = 2; final int sendMessages = 1; // subscribe before publish + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -632,7 +630,7 @@ public void testRetryTopicException() throws Exception { .receiverQueueSize(100) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) - .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .retryLetterTopic(retryLetterTopic) .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -645,30 +643,16 @@ public void testRetryTopicException() throws Exception { } producer.close(); - // mock a retry producer exception when reconsumelater is called - MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; - List> consumers = multiTopicsConsumer.getConsumers(); - for (ConsumerImpl c : consumers) { - Set deadLetterPolicyField = - ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); - - if (deadLetterPolicyField.size() != 0) { - Field field = deadLetterPolicyField.iterator().next(); - field.setAccessible(true); - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); - deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#"); - } - } + admin.topics().terminateTopic(retryLetterTopic); + Message message = consumer.receive(); log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData())); try { consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); - } catch (PulsarClientException.InvalidTopicNameException e) { - assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class); - } catch (Exception e) { - fail("exception should be PulsarClientException.InvalidTopicNameException"); + fail("exception should be PulsarClientException.TopicTerminatedException"); + } catch (PulsarClientException.TopicTerminatedException e) { + // ok } - consumer.close(); } @@ -721,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception { @Test(timeOut = 30000L) public void testRetryTopicExceptionWithConcurrent() throws Exception { - final String topic = "persistent://my-property/my-ns/retry-topic"; + String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic"); + final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic"); final int maxRedeliveryCount = 2; final int sendMessages = 10; // subscribe before publish + @Cleanup Consumer consumer = pulsarClient.newConsumer(Schema.BYTES) .topic(topic) .subscriptionName("my-subscription") @@ -733,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { .receiverQueueSize(100) .deadLetterPolicy(DeadLetterPolicy.builder() .maxRedeliverCount(maxRedeliveryCount) - .retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry") + .retryLetterTopic(retryLetterTopic) .build()) .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) .subscribe(); @@ -742,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { .topic(topic) .create(); for (int i = 0; i < sendMessages; i++) { - producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send(); + producer.send(String.format("Hello Pulsar [%d]", i).getBytes()); } producer.close(); - // mock a retry producer exception when reconsumelater is called - MultiTopicsConsumerImpl multiTopicsConsumer = (MultiTopicsConsumerImpl) consumer; - List> consumers = multiTopicsConsumer.getConsumers(); - for (ConsumerImpl c : consumers) { - Set deadLetterPolicyField = - ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy")); - - if (deadLetterPolicyField.size() != 0) { - Field field = deadLetterPolicyField.iterator().next(); - field.setAccessible(true); - DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c); - deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#"); - } - } + admin.topics().terminateTopic(retryLetterTopic); List> messages = Lists.newArrayList(); for (int i = 0; i < sendMessages; i++) { @@ -772,15 +745,17 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception { new Thread(() -> { try { consumer.reconsumeLater(message, 1, TimeUnit.SECONDS); - } catch (Exception ignore) { - - } finally { + } catch (PulsarClientException.TopicTerminatedException e) { + // ok latch.countDown(); + } catch (PulsarClientException e) { + // unexpected exception + fail("unexpected exception", e); } }).start(); } - latch.await(); + latch.await(sendMessages, TimeUnit.SECONDS); consumer.close(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 00b9b31d2a063..77a91a944ee6c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -1109,10 +1109,29 @@ public void connectionFailed(PulsarClientException exception) { public synchronized CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); + ArrayList> closeFutures = new ArrayList<>(4); + closeFutures.add(closeFuture); + if (retryLetterProducer != null) { + closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); + } + })); + } + if (deadLetterProducer != null) { + closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { + if (ex != null) { + log.warn("Exception ignored in closing deadLetterProducer of consumer", ex); + } + })); + } + CompletableFuture compositeCloseFuture = FutureUtil.waitForAll(closeFutures); + + if (getState() == State.Closing || getState() == State.Closed) { closeConsumerTasks(); failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null)); - return closeFuture; + return compositeCloseFuture; } consumersClosedCounter.increment(); @@ -1124,7 +1143,7 @@ public synchronized CompletableFuture closeAsync() { deregisterFromClientCnx(); client.cleanupConsumer(this); failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null)); - return closeFuture; + return compositeCloseFuture; } stats.getStatTimeout().ifPresent(Timeout::cancel); @@ -1151,23 +1170,7 @@ public synchronized CompletableFuture closeAsync() { }); } - ArrayList> closeFutures = new ArrayList<>(4); - closeFutures.add(closeFuture); - if (retryLetterProducer != null) { - closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("Exception ignored in closing retryLetterProducer of consumer", ex); - } - })); - } - if (deadLetterProducer != null) { - closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> { - if (ex != null) { - log.warn("Exception ignored in closing deadLetterProducer of consumer", ex); - } - })); - } - return FutureUtil.waitForAll(closeFutures); + return compositeCloseFuture; } private void cleanupAtClose(CompletableFuture closeFuture, Throwable exception) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 6f9c5b47c55bb..341272cd69bf8 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -638,7 +638,14 @@ public CompletableFuture closeAsync() { CompletableFuture closeFuture = new CompletableFuture<>(); List> futureList = consumers.values().stream() - .map(ConsumerImpl::closeAsync).collect(Collectors.toList()); + .map(consumer -> consumer.closeAsync().exceptionally(t -> { + Throwable cause = FutureUtil.unwrapCompletionException(t); + if (!(cause instanceof PulsarClientException.AlreadyClosedException)) { + log.warn("[{}] [{}] Error closing individual consumer", consumer.getTopic(), + consumer.getSubscription(), cause); + } + return null; + })).collect(Collectors.toList()); FutureUtil.waitForAll(futureList) .thenComposeAsync((r) -> {