diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java index 3ceb8083e36d2..9bc36cffce57e 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java @@ -1767,8 +1767,8 @@ public void getMessageIdByTimestamp( if (isNot307And404Exception(ex)) { log.error("[{}] Failed to get message ID by timestamp {} from {}", clientAppId(), timestamp, topicName, ex); - resumeAsyncResponseExceptionally(asyncResponse, ex); } + resumeAsyncResponseExceptionally(asyncResponse, ex); return null; }); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java index 2cbff955ecff1..5542f23d5e9f9 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiMultiBrokersTest.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; +import lombok.Cleanup; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.RandomStringUtils; import org.apache.pulsar.broker.MultiBrokerBaseTest; @@ -33,6 +34,8 @@ import org.apache.pulsar.broker.loadbalance.LeaderBroker; import org.apache.pulsar.client.admin.PulsarAdmin; import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.common.naming.TopicDomain; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; @@ -122,4 +125,27 @@ public void testTopicLookup(TopicDomain topicDomain, boolean isPartition) throws Assert.assertEquals(lookupResultSet.size(), 1); } + @Test(timeOut = 30 * 1000) + public void testTopicGetMessageIdByTimestamp() throws Exception { + PulsarAdmin admin0 = getAllAdmins().get(0); + String topic = "public/default/t1"; + admin0.topics().createPartitionedTopic(topic, 1); + pulsarClient = newPulsarClient(lookupUrl.toString(), 0); + @Cleanup + Producer producer = pulsarClient.newProducer().topic(topic).create(); + for (int i = 0; i < 20; i++) { + producer.send("msg".getBytes()); + } + String brokerUrl = admin0.lookups().lookupTopic(topic + "-partition-0"); + PulsarAdmin admin = null; + for (PulsarService additionalBroker : additionalBrokers) { + if (!brokerUrl.endsWith(String.valueOf(additionalBroker.getBrokerListenPort().get()))) { + admin = additionalBroker.getAdminClient(); + } + } + Assert.assertNotNull(admin); + MessageId msgId = + admin.topics().getMessageIdByTimestamp(topic + "-partition-0", System.currentTimeMillis()); + Assert.assertNotNull(msgId); + } }