Skip to content

Commit

Permalink
[fix][test]Fix flaky test testTopicUnloadAfterSessionRebuild (#23852)
Browse files Browse the repository at this point in the history
(cherry picked from commit b3641f0)
  • Loading branch information
poorbarcode authored and lhotari committed Jan 17, 2025
1 parent ad20850 commit 816f867
Showing 1 changed file with 17 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.broker.ServiceConfiguration;
Expand Down Expand Up @@ -94,7 +95,7 @@ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class
admin2.namespaces().unload(defaultNamespace);

// Confirm all brokers registered.
Awaitility.await().untilAsserted(() -> {
Awaitility.await().atMost(Duration.ofSeconds(20)).untilAsserted(() -> {
assertEquals(getAvailableBrokers(pulsar1).size(), 2);
assertEquals(getAvailableBrokers(pulsar2).size(), 2);
});
Expand Down Expand Up @@ -160,7 +161,21 @@ public void testTopicUnloadAfterSessionRebuild(boolean enableSystemTopic, Class
// Verify: the topic on broker-2 is fine.
Awaitility.await().atMost(Duration.ofSeconds(10)).untilAsserted(() -> {
CompletableFuture<Optional<Topic>> future = pulsar1.getBrokerService().getTopic(topicName, false);
assertTrue(future == null || future.isCompletedExceptionally());
log.info("broker 1 topics {}", pulsar1.getBrokerService().getTopics().keys());
log.info("broker 2 topics {}", pulsar2.getBrokerService().getTopics().keys());
log.info("broker 1 bundles {}", pulsar1.getNamespaceService().getOwnershipCache().getOwnedBundles()
.keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange())
.filter(s -> s.contains(defaultNamespace)).collect(Collectors.toList()));
log.info("broker 2 bundles {}", pulsar2.getNamespaceService().getOwnershipCache().getOwnedBundles()
.keySet().stream().map(k -> k.getNamespaceObject().toString() + "/" + k.getBundleRange())
.filter(s -> s.contains(defaultNamespace)).collect(Collectors.toList()));
log.info("future: {}, isDone: {}, isCompletedExceptionally: {}",
future, future == null ? "null" : future.isDone(),
future, future == null ? "null" : future.isCompletedExceptionally());
assertTrue(future == null
|| !pulsar1.getBrokerService().getTopics().containsKey(topicName)
|| (future.isDone() && !future.isCompletedExceptionally() && future.get().isEmpty())
|| future.isCompletedExceptionally());
});
Topic broker2Topic3 = pulsar2.getBrokerService().getTopic(topicName, false).join().get();
assertNotNull(broker2Topic3);
Expand Down

0 comments on commit 816f867

Please sign in to comment.