From d0cb05b959864926d6231ca93255f2c83402d04d Mon Sep 17 00:00:00 2001 From: Xiangying Meng <55571188+liangyepianzhou@users.noreply.github.com> Date: Tue, 14 Nov 2023 20:35:29 +0800 Subject: [PATCH] [fix][broker] Do not write replicated snapshot marker when the topic which is not enable replication (#21495) [PIP 33](https://github.com/apache/pulsar/wiki/PIP-33%3A-Replicated-subscriptions) introduces a new concept ` Replicated subscriptions`. When a topic has a consumer (subscription) that enables replicated subscriptions, it will write markers into the original topic. Even if there is no replicated cluster configured for this topic, the mark will still be written. And that will make the backlog of the topic keep increasing. --- The mark will be written in the following two ways: 1. A scheduled task writes a marker at a fixed time interval if there are new messages published. https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L78-L86 https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsSnapshotBuilder.java#L77-L86 2. Acknowledging message will trigger a check if the first snapshot is written and the mark delete position moves, if true, It will write a marker. https://github.com/apache/pulsar/blob/ea1fc0f20138bc35f54f55d32dabf3c3a3309c8e/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.java#L114-L150 According to the topic policy to create or remove `ReplicatedSubscriptionsController` of this topic. (cherry picked from commit 2322004069f23ecca1a12a4ced898fed854275fb) --- .../service/persistent/PersistentTopic.java | 9 +- .../service/ReplicatorSubscriptionTest.java | 212 ++++++++++++++++++ 2 files changed, 218 insertions(+), 3 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index ebaee40eeec01..4975c9bf785e9 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -2433,7 +2433,7 @@ public CompletableFuture onPoliciesUpdate(@Nonnull Policies data) { } updateTopicPolicyByNamespacePolicy(data); - + checkReplicatedSubscriptionControllerState(); isEncryptionRequired = data.encryption_required; isAllowAutoUpdateSchema = data.is_allow_auto_update_schema; @@ -2898,12 +2898,14 @@ private synchronized void checkReplicatedSubscriptionControllerState(boolean sho boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent(); boolean isEnableReplicatedSubscriptions = brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions(); + boolean replicationEnabled = this.topicPolicies.getReplicationClusters().get().size() > 1; - if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) { + if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions && replicationEnabled) { log.info("[{}] Enabling replicated subscriptions controller", topic); replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this, brokerService.pulsar().getConfiguration().getClusterName())); - } else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) { + } else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions + || !replicationEnabled) { log.info("[{}] Disabled replicated subscriptions controller", topic); replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close); replicatedSubscriptionsController = Optional.empty(); @@ -3085,6 +3087,7 @@ public void onUpdate(TopicPolicies policies) { updateTopicPolicy(policies); updateDispatchRateLimiter(); + checkReplicatedSubscriptionControllerState(); updateSubscriptionsDispatcherRateLimiter().thenRun(() -> { updatePublishDispatcher(); initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies)); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java index 250d971b9fb95..62c8a1b4dde4a 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorSubscriptionTest.java @@ -29,9 +29,11 @@ import java.lang.reflect.Method; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashSet; import java.util.LinkedHashSet; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; @@ -49,7 +51,9 @@ import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionType; import org.apache.pulsar.common.policies.data.PartitionedTopicStats; +import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats; import org.apache.pulsar.common.policies.data.TopicStats; import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; import org.awaitility.Awaitility; @@ -57,6 +61,7 @@ import org.slf4j.LoggerFactory; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -704,6 +709,213 @@ public void testReplicatedSubscriptionWhenReplicatorProducerIsClosed() throws Ex Awaitility.await().untilAsserted(() -> assertNotNull(topic2.getSubscription(subscriptionName))); } + @DataProvider(name = "isTopicPolicyEnabled") + private Object[][] isTopicPolicyEnabled() { + // Todo: fix replication can not be enabled at topic level. + return new Object[][] { { Boolean.FALSE } }; + } + + /** + * Test the replication subscription can work normal in the following cases: + *

+ * 1. Do not write data into the original topic when the topic does not configure a remote cluster. {topic1} + * 1. Publish message to the topic and then wait a moment, + * the backlog will not increase after publishing completely. + * 2. Acknowledge the messages, the last confirm entry does not change. + * 2. Snapshot and mark will be written after topic configure a remote cluster. {topic2} + * 1. publish message to topic. After publishing completely, the backlog of the topic keep increase. + * 2. Wait the snapshot complete, the backlog stop changing. + * 3. Publish messages to wait another snapshot complete. + * 4. Ack messages to move the mark delete position after the position record in the first snapshot. + * 5. Check new entry (a mark) appending to the original topic. + * 3. Stopping writing snapshot and mark after remove the remote cluster of the topic. {topic2} + * similar to step 1. + *

+ */ + @Test(dataProvider = "isTopicPolicyEnabled") + public void testWriteMarkerTaskOfReplicateSubscriptions(boolean isTopicPolicyEnabled) throws Exception { + // 1. Prepare resource and use proper configuration. + String namespace = BrokerTestUtil.newUniqueName("pulsar/testReplicateSubBackLog"); + String topic1 = "persistent://" + namespace + "/replication-enable"; + String topic2 = "persistent://" + namespace + "/replication-disable"; + String subName = "sub"; + + admin1.namespaces().createNamespace(namespace); + pulsar1.getConfiguration().setTopicLevelPoliciesEnabled(isTopicPolicyEnabled); + pulsar1.getConfiguration().setReplicationPolicyCheckDurationSeconds(1); + pulsar1.getConfiguration().setReplicatedSubscriptionsSnapshotFrequencyMillis(1000); + // 2. Build Producer and Consumer. + @Cleanup + PulsarClient client1 = PulsarClient.builder().serviceUrl(url1.toString()) + .statsInterval(0, TimeUnit.SECONDS) + .build(); + @Cleanup + Consumer consumer1 = client1.newConsumer() + .topic(topic1) + .subscriptionName(subName) + .ackTimeout(5, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .replicateSubscriptionState(true) + .subscribe(); + @Cleanup + Producer producer1 = client1.newProducer() + .topic(topic1) + .create(); + // 3. Test replication subscription work as expected. + // Test case 1: disable replication, backlog will not increase. + testReplicatedSubscriptionWhenDisableReplication(producer1, consumer1, topic1); + + // Test case 2: enable replication, mark and snapshot work as expected. + if (isTopicPolicyEnabled) { + admin1.topics().createNonPartitionedTopic(topic2); + admin1.topics().setReplicationClusters(topic2, Arrays.asList("r1", "r2")); + } else { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1", "r2")); + } + @Cleanup + Consumer consumer2 = client1.newConsumer() + .topic(topic2) + .subscriptionName(subName) + .ackTimeout(5, TimeUnit.SECONDS) + .subscriptionType(SubscriptionType.Shared) + .replicateSubscriptionState(true) + .subscribe(); + @Cleanup + Producer producer2 = client1.newProducer() + .topic(topic2) + .create(); + testReplicatedSubscriptionWhenEnableReplication(producer2, consumer2, topic2); + + // Test case 3: enable replication, mark and snapshot work as expected. + if (isTopicPolicyEnabled) { + admin1.topics().setReplicationClusters(topic2, Arrays.asList("r1")); + } else { + admin1.namespaces().setNamespaceReplicationClusters(namespace, Sets.newHashSet("r1")); + } + testReplicatedSubscriptionWhenDisableReplication(producer2, consumer2, topic2); + // 4. Clear resource. + pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(true); + admin1.namespaces().deleteNamespace(namespace, true); + pulsar1.getConfiguration().setForceDeleteNamespaceAllowed(false); + } + + /** + * Disable replication subscription. + * Test scheduled task case. + * 1. Send three messages |1:0|1:1|1:2|. + * 2. Get topic backlog, as backlog1. + * 3. Wait a moment. + * 4. Get the topic backlog again, the backlog will not increase. + * Test acknowledge messages case. + * 1. Get the last confirm entry, as LAC1. + * 2. Acknowledge these messages |1:0|1:1|. + * 3. wait a moment. + * 4. Get the last confirm entry, as LAC2. LAC1 is equal to LAC2. + * Clear environment. + * 1. Ack all the retained messages. |1:2| + * 2. Wait for the backlog to return to zero. + */ + private void testReplicatedSubscriptionWhenDisableReplication(Producer producer, Consumer consumer, + String topic) throws Exception { + final int messageSum = 3; + // Test scheduled task case. + for (int i = 0; i < messageSum; i++) { + producer.newMessage().send(); + } + long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize(); + Thread.sleep(3000); + long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertEquals(backlog1, backlog2); + // Test acknowledge messages case. + String lastConfirmEntry1 = admin1.topics().getInternalStats(topic).lastConfirmedEntry; + for (int i = 0; i < messageSum - 1; i++) { + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + } + Awaitility.await().untilAsserted(() -> { + String lastConfirmEntry2 = admin1.topics().getInternalStats(topic).lastConfirmedEntry; + assertEquals(lastConfirmEntry1, lastConfirmEntry2); + }); + // Clear environment. + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + Awaitility.await().untilAsserted(() -> { + long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertEquals(backlog4, 0); + }); + } + + /** + * Enable replication subscription. + * Test scheduled task case. + * 1. Wait replicator connected. + * 2. Send three messages |1:0|1:1|1:2|. + * 3. Get topic backlog, as backlog1. + * 4. Wait a moment. + * 5. Get the topic backlog again, as backlog2. The backlog2 is bigger than backlog1. |1:0|1:1|1:2|mark|. + * 6. Wait the snapshot complete. + * Test acknowledge messages case. + * 1. Write messages and wait another snapshot complete. |1:0|1:1|1:2|mark|1:3|1:4|1:5|mark| + * 2. Ack message |1:0|1:1|1:2|1:3|1:4|. + * 3. Get last confirm entry, as LAC1. + * 2. Wait a moment. + * 3. Get Last confirm entry, as LAC2. LAC2 different to LAC1. |1:5|mark|mark| + * Clear environment. + * 1. Ack all the retained message |1:5|. + * 2. Wait for the backlog to return to zero. + */ + private void testReplicatedSubscriptionWhenEnableReplication(Producer producer, Consumer consumer, + String topic) throws Exception { + final int messageSum = 3; + Awaitility.await().untilAsserted(() -> { + List keys = pulsar1.getBrokerService() + .getTopic(topic, false).get().get() + .getReplicators().keys(); + assertEquals(keys.size(), 1); + assertTrue(pulsar1.getBrokerService() + .getTopic(topic, false).get().get() + .getReplicators().get(keys.get(0)).isConnected()); + }); + // Test scheduled task case. + sendMessageAndWaitSnapshotComplete(producer, topic, messageSum); + // Test acknowledge messages case. + // After snapshot write completely, acknowledging message to move the mark delete position + // after the position recorded in the snapshot will trigger to write a new marker. + sendMessageAndWaitSnapshotComplete(producer, topic, messageSum); + String lastConfirmedEntry3 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + for (int i = 0; i < messageSum * 2 - 1; i++) { + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + } + Awaitility.await().untilAsserted(() -> { + String lastConfirmedEntry4 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + assertNotEquals(lastConfirmedEntry3, lastConfirmedEntry4); + }); + // Clear environment. + consumer.acknowledge(consumer.receive(5, TimeUnit.SECONDS)); + Awaitility.await().untilAsserted(() -> { + long backlog4 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertEquals(backlog4, 0); + }); + } + + private void sendMessageAndWaitSnapshotComplete(Producer producer, String topic, + int messageSum) throws Exception { + for (int i = 0; i < messageSum; i++) { + producer.newMessage().send(); + } + long backlog1 = admin1.topics().getStats(topic, false).getBacklogSize(); + Awaitility.await().untilAsserted(() -> { + long backlog2 = admin1.topics().getStats(topic, false).getBacklogSize(); + assertTrue(backlog2 > backlog1); + }); + // Wait snapshot write completely, stop writing marker into topic. + Awaitility.await().untilAsserted(() -> { + String lastConfirmedEntry1 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + PersistentTopicInternalStats persistentTopicInternalStats = admin1.topics().getInternalStats(topic, false); + Thread.sleep(1000); + String lastConfirmedEntry2 = admin1.topics().getInternalStats(topic, false).lastConfirmedEntry; + assertEquals(lastConfirmedEntry1, lastConfirmedEntry2); + }); + } + void publishMessages(Producer producer, int startIndex, int numMessages, Set sentMessages) throws PulsarClientException { for (int i = startIndex; i < startIndex + numMessages; i++) {