diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java index 143557b008b23..609430db6df05 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ZkSessionExpireTest.java @@ -26,6 +26,7 @@ import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.ServiceConfiguration; @@ -94,7 +95,7 @@ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class admin2.namespaces().unload(defaultNamespace); // Confirm all brokers registered. - Awaitility.await().untilAsserted(() -> { + Awaitility.await().atMost(Duration.ofSeconds(20)).untilAsserted(() -> { assertEquals(getAvailableBrokers(pulsar1).size(), 2); assertEquals(getAvailableBrokers(pulsar2).size(), 2); }); @@ -160,7 +161,21 @@ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class // Verify: the topic on broker-2 is fine. Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> { CompletableFuture> future = pulsar1.getBrokerService().getTopic(topicName, false); - assertTrue(future == null || future.isCompletedExceptionally()); + log.info("broker 1 topics {}", pulsar1.getBrokerService().getTopics().keys()); + log.info("broker 2 topics {}", pulsar2.getBrokerService().getTopics().keys()); + log.info("broker 1 bundles {}", pulsar1.getNamespaceService().getOwnershipCache().getOwnedBundles() + .keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange()) + .filter(s -> s.contains(defaultNamespace)).collect(Collectors.toList())); + log.info("broker 2 bundles {}", pulsar2.getNamespaceService().getOwnershipCache().getOwnedBundles() + .keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange()) + .filter(s -> s.contains(defaultNamespace)).collect(Collectors.toList())); + log.info("future: {}, isDone: {}, isCompletedExceptionally: {}", + future, future == null ? "null" : future.isDone(), + future, future == null ? "null" : future.isCompletedExceptionally()); + assertTrue(future == null + || !pulsar1.getBrokerService().getTopics().containsKey(topicName) + || (future.isDone() && !future.isCompletedExceptionally() && future.get().isEmpty()) + || future.isCompletedExceptionally()); }); Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName, false).join().get(); assertNotNull(broker2Topic3);