diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java index e344d892b3137..c5fd889881bef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SystemTopicBasedTopicPoliciesService.java @@ -127,7 +127,16 @@ public CompletableFuture deleteTopicPoliciesAsync(TopicName topicName) { if (NamespaceService.isHeartbeatNamespace(topicName.getNamespaceObject())) { return CompletableFuture.completedFuture(null); } - return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); + TopicName changeEvents = NamespaceEventsSystemTopicFactory.getEventsTopicName(topicName.getNamespaceObject()); + return pulsarService.getNamespaceService().checkTopicExists(changeEvents).thenCompose(topicExistsInfo -> { + // If the system topic named "__change_events" has been deleted, it means all the data in the topic have + // been deleted, so we do not need to delete the message that we want to delete again. + if (!topicExistsInfo.isExists()) { + log.info("Skip delete topic-level policies because {} has been removed before", changeEvents); + return CompletableFuture.completedFuture(null); + } + return sendTopicPolicyEvent(topicName, ActionType.DELETE, null); + }); } @Override diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java index f5e6c7748d10b..199026bc4c445 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/systopic/NamespaceEventsSystemTopicFactory.java @@ -37,12 +37,16 @@ public NamespaceEventsSystemTopicFactory(PulsarClient client) { } public TopicPoliciesSystemTopicClient createTopicPoliciesSystemTopicClient(NamespaceName namespaceName) { - TopicName topicName = TopicName.get(TopicDomain.persistent.value(), namespaceName, - SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + TopicName topicName = getEventsTopicName(namespaceName); log.info("Create topic policies system topic client {}", topicName.toString()); return new TopicPoliciesSystemTopicClient(client, topicName); } + public static TopicName getEventsTopicName(NamespaceName namespaceName) { + return TopicName.get(TopicDomain.persistent.value(), namespaceName, + SystemTopicNames.NAMESPACE_EVENTS_LOCAL_NAME); + } + public TransactionBufferSnapshotBaseSystemTopicClient createTransactionBufferSystemTopicClient( TopicName systemTopicName, SystemTopicTxnBufferSnapshotService systemTopicTxnBufferSnapshotService, Class schemaType) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java index d99969fbaa7e5..949ac17124d6a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/OneWayReplicatorUsingGlobalZKTest.java @@ -19,17 +19,25 @@ package org.apache.pulsar.broker.service; import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; import java.util.Arrays; import java.util.HashSet; +import java.util.Optional; import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.policies.data.RetentionPolicies; +import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; @@ -173,4 +181,42 @@ public void testDifferentTopicCreationRule(ReplicationMode replicationMode) thro public void testReplicationCountMetrics() throws Exception { super.testReplicationCountMetrics(); } + + @Test + public void testRemoveCluster() throws Exception { + // Initialize. + final String ns1 = defaultTenant + "/" + "ns_73b1a31afce34671a5ddc48fe5ad7fc8"; + final String topic = "persistent://" + ns1 + "/___tp-5dd50794-7af8-4a34-8a0b-06188052c66a"; + final String topicChangeEvents = "persistent://" + ns1 + "/__change_events"; + admin1.namespaces().createNamespace(ns1); + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster1, cluster2))); + admin1.topics().createNonPartitionedTopic(topic); + + // Wait for loading topic up. + Producer p = client1.newProducer(Schema.STRING).topic(topic).create(); + Awaitility.await().untilAsserted(() -> { + ConcurrentOpenHashMap>> tps + = pulsar1.getBrokerService().getTopics(); + assertTrue(tps.containsKey(topic)); + assertTrue(tps.containsKey(topicChangeEvents)); + }); + + // The topics under the namespace of the cluster-1 will be deleted. + // Verify the result. + admin1.namespaces().setNamespaceReplicationClusters(ns1, new HashSet<>(Arrays.asList(cluster2))); + Awaitility.await().atMost(Duration.ofSeconds(120)).untilAsserted(() -> { + ConcurrentOpenHashMap>> tps + = pulsar1.getBrokerService().getTopics(); + assertFalse(tps.containsKey(topic)); + assertFalse(tps.containsKey(topicChangeEvents)); + assertFalse(pulsar1.getNamespaceService().checkTopicExists(TopicName.get(topic)).join().isExists()); + assertFalse(pulsar1.getNamespaceService() + .checkTopicExists(TopicName.get(topicChangeEvents)).join().isExists()); + }); + + // cleanup. + p.close(); + admin2.topics().delete(topic); + admin2.namespaces().deleteNamespace(ns1); + } }